forked from yforster/pythomat
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pythomat.py
272 lines (230 loc) · 12.1 KB
/
pythomat.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
#!/usr/bin python3
import configparser
import glob
import os
import pathlib
import subprocess
import sys
import time
import traceback
from argparse import ArgumentParser
from datetime import datetime
from typing import List, Tuple, TextIO
from mechanize import Browser
# noinspection PyPep8Naming
class Pythomat:
downloaded: List[Tuple[str, str]] = []
errors: List[Tuple[str, str]] = []
failed: List[Tuple[str, str]] = []
logFile: TextIO = None
@staticmethod
def getConfigFromIni(inipath: str):
ini = configparser.ConfigParser()
ini.read(inipath)
return ini
@staticmethod
def get_browser(url: str, httpUsername: str, httpPassword: str):
br = Browser()
br.addheaders = [
('User-agent', 'Pythomat')
]
if httpUsername is not None and httpPassword is not None:
br.set_handle_robots(False)
br.add_password(url, httpUsername, httpPassword)
return br
def already_downloaded(self, root: str, filename: str, recursive: bool) -> bool:
if recursive:
for dir, sub_dirs, files in os.walk(root):
if filename in files:
return True
return False
else:
return os.path.isfile(os.path.join(root, filename))
def closeLog(self):
if self.logFile is not None:
self.logFile.close()
# Downloads a single file form url to path and names it filename
def download(self, section: str, url: str, filename: str = "", saveto: str = "", detect: str = "",
detect_recursive: bool = False, createdirs: bool = False, overwrite: bool = False,
checklastmodified: bool = True, httpUsername: str = None, httpPassword: str = None, logMessage: str = None, browser: Browser = None) -> bool:
uptodate: bool = False
if browser is None:
br = self.get_browser(url, httpUsername, httpPassword)
else:
br = browser
try:
if filename == "":
filename = url.split("/")[-1]
filename = filename.split("?")[0]
if not saveto.endswith("/"):
saveto = saveto + "/"
if overwrite == 1 and self.already_downloaded(detect, filename, detect_recursive) and checklastmodified:
# Overwriting is enabled and file has been already downloaded
br.open(url)
remote_time = time.strptime(br.response().info()["last-modified"], "%a, %d %b %Y %H:%M:%S GMT")
local_time = time.gmtime(os.stat(os.path.join(saveto, filename)).st_mtime)
do_download = (remote_time > local_time)
uptodate = True
elif overwrite == 0 and self.already_downloaded(detect, filename, detect_recursive):
# Overwriting is disabled and file has been already downloaded
do_download = False
else:
do_download = True
if do_download:
if createdirs and not os.path.exists(saveto):
print(f"Creating directory \"{saveto}\" …")
os.makedirs(saveto)
os.chdir(saveto)
print(f"Downloading {url} as \"{filename}\" …")
br.retrieve(url, os.path.join(saveto, filename + ".tmp"))
os.rename(os.path.join(saveto, filename + ".tmp"), os.path.join(saveto, filename))
self.reportFinished(section, filename)
self.reportLog(section, filename, logMessage)
return True
else:
if uptodate:
print(f"[Ignored] Up-to-date: {url}")
else:
print(f"[Ignored] Already downloaded: {url}")
except Exception as ex:
print(f"[Failed] {url}, Error: {ex}", file=sys.stderr)
self.reportFailed(section, filename)
return False
# Downloads all files with links containing pattern on path to saveto
def download_all(self, section: str, url: str, pattern: str, saveto: str, detect: str,
detect_recursive: bool = False, createdirs: bool = False, overwrite: bool = False, httpUsername: str = None, httpPassword: str = None):
br = self.get_browser(url, httpUsername, httpPassword)
br.open(url)
for link in br.links(url_regex=pattern):
if link.url.startswith("http://") or link.url.startswith("https://"):
self.download(section, link.url, saveto=saveto, detect=detect, detect_recursive=detect_recursive, createdirs=createdirs, overwrite=overwrite, httpUsername=httpUsername, httpPassword=httpPassword)
elif link.url.startswith("/"):
self.download(section, link.base_url[:link.base_url.find("/", 8)] + link.url, saveto=saveto, detect=detect, detect_recursive=detect_recursive, createdirs=createdirs, overwrite=overwrite, httpUsername=httpUsername, httpPassword=httpPassword)
else:
self.download(section, link.base_url[:link.base_url.rfind("/") + 1] + link.url, saveto=saveto, detect=detect, detect_recursive=detect_recursive, createdirs=createdirs, overwrite=overwrite, httpUsername=httpUsername, httpPassword=httpPassword)
# Downloads YouTube-Video with id to saveto and overwrites (or not)
def downloadYoutube(self, section: str, id: str, overwrite=True, saveto=""):
output = f"-o \"{saveto}%(title)s-%(id)s.%(ext)s\""
if overwrite or len(glob.glob(f"{saveto}*{id}*")) == 0:
url = f"https://www.youtube.com/watch?v={id}"
subprocess.call(f"youtube-dl {output} \"{url}\"", shell=True)
self.reportFinished(section, id)
# Parses .ini file and executes the given Downloads
def downloadFromIni(self, ini: configparser.ConfigParser, createdirs: bool, rules: str):
working_dir: str = str(pathlib.Path().absolute())
rule_list: List[str] = None if rules is None else rules.split(",")
for section in ini.sections():
os.chdir(working_dir) # Reset working dir in case it has been changed
if rule_list is not None and section not in rule_list and rules != "all":
print(f"### Skipping {section} ###")
continue
skip = int(ini.get(section, "skip", fallback=0))
if rule_list is None and skip == 1:
print(f"### Skipping {section} - must be started manually ###")
continue
print(f"### Processing {section} ###")
uri = ini.get(section, "uri")
saveto = ini.get(section, "saveto")
detect = ini.get(section, "detect", fallback=saveto)
detect_recursive = ini.get(section, "detect_recursive", fallback=False)
mode = ini.get(section, "mode")
http_username = ini.get(section, "username", fallback=None)
http_password = ini.get(section, "password", fallback=None)
if mode == "batch":
pattern = ini.get(section, "pattern")
overwrite = ini.get(section, "overwrite", fallback="False") == "False" or ini.get(section, "overwrite") == "0"
try:
self.download_all(section, uri, pattern, saveto, detect, detect_recursive=detect_recursive, createdirs=createdirs, overwrite=overwrite, httpUsername=http_username, httpPassword=http_password)
except Exception as e:
print("An error occured", file=sys.stderr)
print(traceback.format_exc(), file=sys.stderr)
self.reportError(section, str(e))
elif mode == "cms" or mode == "moodle":
name = mode
module = __import__(name, globals=globals())
items = dict(ini.items(section))
items["createdirs"] = createdirs
try:
module.start(section, items, self)
except Exception as e:
print("An error occured", file=sys.stderr)
print(traceback.format_exc(), file=sys.stderr)
self.reportError(section, str(e))
elif mode == "module":
name = "cms" if mode == "cms" else ini.get(section, "module")
module = __import__(name, globals=globals())
items = dict(ini.items(section))
items["createdirs"] = createdirs
try:
module.start(section, items, self)
except Exception as e:
print("An error occured", file=sys.stderr)
print(traceback.format_exc(), file=sys.stderr)
self.reportError(section, str(e))
elif mode == "single":
name = ini.get(section, "filename", fallback="")
overwrite = ini.get(section, "overwrite", fallback="False") == "False" or ini.get(section, "overwrite") == "0"
try:
self.download(section, uri, name, saveto, detect, detect_recursive=detect_recursive, createdirs=createdirs, overwrite=overwrite, httpUsername=http_username, httpPassword=http_password)
except Exception as e:
print("An error occured", file=sys.stderr)
print(traceback.format_exc(), file=sys.stderr)
self.reportError(section, str(e))
elif mode == "youtube":
overwrite = ini.get(section, "overwrite", fallback="False") == "False" or ini.get(section, "overwrite") == "0"
try:
self.downloadYoutube(section, uri, overwrite, saveto)
except Exception as e:
print("An error occured", file=sys.stderr)
print(traceback.format_exc(), file=sys.stderr)
self.reportError(section, str(e))
else:
print(f"Mode '{mode}' unsupported", file=sys.stderr)
def openLog(self, path: str):
if path is not None:
self.logFile = open(path, 'a')
def printReport(self):
print("### Report ###")
if len(self.errors) != 0:
print("Errors:")
for error in self.errors:
print(f"• {error[0]} | {error[1]}")
if len(self.failed) != 0:
print("Failed:")
for failed in self.failed:
print(f"• {failed[0]} | {failed[1]}")
if len(self.downloaded) == 0:
print("Downloaded: nothing")
else:
print("Downloaded:")
for downloaded in self.downloaded:
print(f"• {downloaded[0]} | {downloaded[1]}")
def reportError(self, section: str, msg: str):
self.errors.append((section, msg))
def reportFailed(self, section: str, filename: str):
self.failed.append((section, filename))
def reportFinished(self, section: str, filename: str):
self.downloaded.append((section, filename))
def reportLog(self, section: str, filename: str, message: str = None):
if self.logFile is not None:
self.logFile.write("{} [{}] {}: {}\n".format(datetime.now().isoformat(), section, filename, "Downloaded" if message is None else message))
def main():
parser = ArgumentParser()
parser.add_argument("inipath", nargs="?")
parser.add_argument("--createdirs", action="store_true", help="Automatically create directories")
parser.add_argument("-l", "--list", action="store_true", help="Display a list of all user-defined rules")
parser.add_argument("--log", action="store", help="Logs history of downloads to specified path")
parser.add_argument("-r", "--rules", help="List of rules to run seperated by commas")
parser.add_argument("--version", action='version', version='%(prog)s 2.0 | https://github.com/muekoeff/pythomat')
args = parser.parse_args()
ini = Pythomat.getConfigFromIni(args.inipath if args.inipath else "pythomat.ini")
if args.list:
print(",".join(ini.sections()))
else:
pythomat = Pythomat()
pythomat.openLog(args.log)
pythomat.downloadFromIni(ini, args.createdirs, args.rules)
pythomat.printReport()
pythomat.closeLog()
if __name__ == "__main__":
main()