Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sourcery Starbot ⭐ refactored evgenytsydenov/python_course #1

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 26 additions & 23 deletions exchanger/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,8 @@ def _extract_email(self, msg: Dict[str, Any]) -> str:
"""
headers = msg['payload']['headers']
sender = next(x for x in headers if x['name'] == 'From')['value']
match = re.search('.*<(?P<email>.*)>.*', sender)
if match:
sender = match.group('email')
if match := re.search('.*<(?P<email>.*)>.*', sender):
sender = match['email']
Comment on lines -250 to +251
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function GmailExchanger._extract_email refactored with the following changes:

logger.debug(f'Sender email "{sender}" was extracted '
f'from the message with id "{msg["id"]}".')
return sender
Expand All @@ -265,10 +264,10 @@ def _extract_lesson_name(self, msg: Dict[str, Any]) -> str:
"""
headers = msg['payload']['headers']
subject = next(x for x in headers if x['name'] == 'Subject')['value']
match = re.search('^(?P<label>.*)/(?P<lesson>.*)$', subject)
les_name = ''
if match:
les_name = match.group('lesson')
if match := re.search('^(?P<label>.*)/(?P<lesson>.*)$', subject):
les_name = match['lesson']
else:
les_name = ''
Comment on lines -268 to +270
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function GmailExchanger._extract_lesson_name refactored with the following changes:

logger.debug(f'Lesson name "{les_name}" extracted '
f'from the message with id "{msg["id"]}".')
return les_name
Expand Down Expand Up @@ -381,18 +380,21 @@ def _get_label_id(self, label_name: str) -> str:
:return: label id.
"""
all_labels = self._gmail.users().labels().list(userId='me').execute()
label_info = {}
for label in all_labels['labels']:
if label['name'] == label_name:
label_info = label
break
label_info = next(
(
label
for label in all_labels['labels']
if label['name'] == label_name
),
{},
)
if label_info:
logger.debug(f'Gmail label "{label_info}" already exists.')
else:
body = {'name': label_name, 'messageListVisibility': 'show',
'labelListVisibility': 'labelShow'}
label_info = self._gmail.users().labels() \
.create(userId='me', body=body).execute()
.create(userId='me', body=body).execute()
Comment on lines -384 to +397
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function GmailExchanger._get_label_id refactored with the following changes:

  • Use the built-in function next instead of a for-loop (use-next)

logger.debug(f'New label "{label_info}" was created.')
return label_info['id']

Expand All @@ -408,23 +410,24 @@ def _create_filter(self, label_id: str, fetch_keyword: str,
"""
# List all filters
filters = self._gmail.users().settings().filters() \
.list(userId='me').execute()
.list(userId='me').execute()

# Find if already exist
criteria = {'to': fetch_alias, 'subject': fetch_keyword}
action = {'addLabelIds': [label_id],
'removeLabelIds': ['INBOX', 'SPAM']}
filter_info = {}
for gmail_filter in filters['filter']:
if (gmail_filter['criteria'] == criteria) \
and (gmail_filter['action'] == action):
filter_info = gmail_filter
break

if filter_info:
if filter_info := next(
(
gmail_filter
for gmail_filter in filters['filter']
if (gmail_filter['criteria'] == criteria)
and (gmail_filter['action'] == action)
),
{},
):
logger.debug(f'Filter {filter_info} already exists.')
else:
body = {'criteria': criteria, 'action': action}
self._gmail.users().settings().filters() \
.create(userId='me', body=body).execute()
.create(userId='me', body=body).execute()
Comment on lines -411 to +432
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function GmailExchanger._create_filter refactored with the following changes:

  • Use the built-in function next instead of a for-loop (use-next)
  • Use named expression to simplify assignment and conditional (use-named-expression)

logger.debug(f'Filter {filter_info} has been created.')
6 changes: 3 additions & 3 deletions exchanger/feedback.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ def _get_success_feedback(self, grade_result: GradeResult) -> (str, str):
"""
timestamp = grade_result.timestamp.strftime(DATE_FORMAT)
subject = f'{self._course_name} / {grade_result.lesson_name} ' \
f'/ {timestamp}'
score = sum([task.score for task in grade_result.task_grades])
max_score = sum([task.max_score for task in grade_result.task_grades])
f'/ {timestamp}'
score = sum(task.score for task in grade_result.task_grades)
max_score = sum(task.max_score for task in grade_result.task_grades)
Comment on lines -78 to +80
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function FeedbackCreator._get_success_feedback refactored with the following changes:

body = self._grades_body.format(
first_name=grade_result.first_name,
lesson_name=grade_result.lesson_name,
Expand Down
12 changes: 6 additions & 6 deletions grader/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ def get_lesson_info(self, lesson_name: str) -> Dict[str, Any]:
les_tab = self._meta.tables['assignment']
q_filter = func.lower(les_tab.c.name) == lesson_name.lower()
lesson_sql = select(['*']).where(q_filter)
result = connection.execute(lesson_sql).first()
les_info = {}
if result:
if result := connection.execute(lesson_sql).first():
les_info = dict(result)
else:
les_info = {}
Comment on lines -71 to +74
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function DatabaseHandler.get_lesson_info refactored with the following changes:

logger.debug(f'The following information about lesson with '
f'name "{lesson_name}" was loaded: {les_info}.')
return les_info
Expand All @@ -87,10 +87,10 @@ def get_user_info(self, email: str) -> Dict[str, Any]:
users_table = self._meta.tables['student']
q_filter = func.lower(users_table.c.email) == email.lower()
user_sql = select(['*']).where(q_filter)
result = connection.execute(user_sql).first()
user_info = {}
if result:
if result := connection.execute(user_sql).first():
user_info = dict(result)
else:
user_info = {}
Comment on lines -90 to +93
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function DatabaseHandler.get_user_info refactored with the following changes:

logger.debug(f'The following information about user with '
f'email "{email}" was loaded: {user_info}.')
return user_info
Expand Down
26 changes: 12 additions & 14 deletions grader/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,18 +225,19 @@ def _is_notebook_valid(self, path: str, lesson_name: str) -> bool:
all_cells = json.load(file).get('cells', [])

# Get only nbgrader cells
for cell in all_cells:
if 'nbgrader' in cell['metadata'].keys():
nb_cells.append(
cell['metadata']['nbgrader'].get('grade_id'))
nb_cells.extend(
cell['metadata']['nbgrader'].get('grade_id')
for cell in all_cells
if 'nbgrader' in cell['metadata'].keys()
)
Comment on lines -228 to +232
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function Grader._is_notebook_valid refactored with the following changes:

except (JSONDecodeError, UnicodeDecodeError):
logger.debug(f'File "{path}" does not have a json structure.')
return False

# Get valid cells
with self._nb_grader.gradebook as gb:
origin_cells = gb.find_notebook(lesson_name, lesson_name) \
.source_cells
.source_cells
true_cells = [cell.name for cell in origin_cells]
return Counter(nb_cells) == Counter(true_cells)

Expand Down Expand Up @@ -306,20 +307,17 @@ def _get_lesson_tasks(self, lesson_name: str) -> List[Task]:
tasks = []
task = None
for cell in notebook['cells']:
nb_data = cell['metadata'].get('nbgrader')
if nb_data:
if nb_data := cell['metadata'].get('nbgrader'):
# If it is a task name cell
if cell['cell_type'] == 'markdown':
is_todo = re.match(pat, cell['source'][0])
if is_todo:
task = Task(name=is_todo.group('name'))
if is_todo := re.match(pat, cell['source'][0]):
task = Task(name=is_todo['name'])
continue

# If it is a test cell
if cell['cell_type'] == 'code':
if nb_data['grade']:
task.test_cell = nb_data['grade_id']
tasks.append(task)
if cell['cell_type'] == 'code' and nb_data['grade']:
task.test_cell = nb_data['grade_id']
tasks.append(task)
Comment on lines -309 to +320
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function Grader._get_lesson_tasks refactored with the following changes:

logger.debug(f'Task names were extracted for lesson "{lesson_name}".')
return tasks

Expand Down
57 changes: 27 additions & 30 deletions publisher/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ def connect(self) -> None:

# Get id of cloud folder
query = f"mimeType = 'application/vnd.google-apps.folder' " \
f"and name = '{self.cloud_root_name}' " \
f"and trashed != True"
cloud_file_id = self._find_cloud_files(query, attributes=['id'])
if not cloud_file_id:
f"and name = '{self.cloud_root_name}' " \
f"and trashed != True"
if cloud_file_id := self._find_cloud_files(query, attributes=['id']):
self._cloud_root_id = cloud_file_id[0]['id']
else:
raise ValueError('The root cloud folder for publishing must be '
'pre created.')
self._cloud_root_id = cloud_file_id[0]['id']
Comment on lines -43 to -49
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function GDrivePublisher.connect refactored with the following changes:


def _create_cloud_path(self, cloud_path: str) -> str:
"""Create all intermediate folders on the way to the cloud path.
Expand All @@ -60,12 +60,13 @@ def _create_cloud_path(self, cloud_path: str) -> str:
return parent_id
for name in cloud_path.split(os.sep):
query = f"name = '{name}' and '{parent_id}' in parents " \
f"and trashed != True"
f"and trashed != True"
folder = self._find_cloud_files(query, ['id'])
if not folder:
parent_id = self._create_cloud_folder(name, parent_id)
else:
parent_id = folder[0]['id']
parent_id = (
folder[0]['id']
if folder
else self._create_cloud_folder(name, parent_id)
)
Comment on lines -63 to +69
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function GDrivePublisher._create_cloud_path refactored with the following changes:

return parent_id

def sync(self, local_file: str, cloud_folder_path: str = '.',
Expand Down Expand Up @@ -96,17 +97,16 @@ def sync(self, local_file: str, cloud_folder_path: str = '.',

# Find the file and sync it
query = f"name = '{local_name}' and '{parent_id}' in parents " \
f"and trashed != True"
cloud_file = self._find_cloud_files(query, ['id'])
if not cloud_file:
logger.debug(f'File or folder "{local_name}" does not exist '
f'in the cloud path "{cloud_folder_path}".')
cloud_file_id = self._upload_file(local_file, parent_id)
else:
f"and trashed != True"
if cloud_file := self._find_cloud_files(query, ['id']):
cloud_file_id = cloud_file[0]['id']
logger.debug(f'File or folder "{local_name}" exists '
f'in the cloud path "{cloud_folder_path}".')
self._update_cloud_file(cloud_file_id, local_file)
else:
logger.debug(f'File or folder "{local_name}" does not exist '
f'in the cloud path "{cloud_folder_path}".')
cloud_file_id = self._upload_file(local_file, parent_id)
Comment on lines -99 to +109
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function GDrivePublisher.sync refactored with the following changes:

logger.debug(f'Content of the local object "{local_file}" was '
f'synchronized with the cloud file "{cloud_name}".')

Expand All @@ -115,15 +115,15 @@ def sync(self, local_file: str, cloud_folder_path: str = '.',
file_params = self._get_cloud_file(
cloud_file_id, ['permissions', 'name'])
file_params[link_type] = f'http://drive.google.com/' \
f'thumbnail?id={cloud_file_id}'
f'thumbnail?id={cloud_file_id}'
else:
file_params = self._get_cloud_file(
cloud_file_id, ['permissions', link_type, 'name'])
if to_share:
is_shared = False
for p in file_params['permissions']:
if (p['id'] == 'anyoneWithLink') and (p['role'] == 'reader'):
is_shared = True
is_shared = any(
(p['id'] == 'anyoneWithLink') and (p['role'] == 'reader')
for p in file_params['permissions']
)
if is_shared:
logger.debug(f'File or folder "{file_params["name"]}" with '
f'id "{cloud_file_id}" is already shared.')
Expand Down Expand Up @@ -297,18 +297,15 @@ def _find_cloud_files(self, query: str, attributes: Iterable[str]) \
results['files'] += next_page['files']
return results['files']

def _get_cloud_file(self, file_id: str, attributes: Iterable[str]) \
-> Dict[str, Any]:
def _get_cloud_file(self, file_id: str, attributes: Iterable[str]) -> Dict[str, Any]:
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function GDrivePublisher._get_cloud_file refactored with the following changes:

"""Get attributes of cloud file.

:param file_id: if of file.
:param attributes: attributes to get.
:return: dict with attributes.
"""
fields = f'{", ".join(attributes)}'
results = self._gdrive.files().get(
fileId=file_id, fields=fields).execute()
return results
return self._gdrive.files().get(fileId=file_id, fields=fields).execute()

def _get_ignore_files(self) -> List[str]:
"""Get file patterns to ignore when publishing.
Expand All @@ -318,8 +315,8 @@ def _get_ignore_files(self) -> List[str]:
patterns = []
path = os.path.join(os.path.dirname(__file__), '.publishignore')
with open(path, 'r') as file:
for line in file.readlines():
if line.strip() and not line.startswith('#'):
patterns.append(line)
patterns.extend(
line for line in file if line.strip() and not line.startswith('#')
)
Comment on lines -321 to +320
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function GDrivePublisher._get_ignore_files refactored with the following changes:

logger.debug('Publish ignore files were loaded.')
return patterns
4 changes: 1 addition & 3 deletions utils/app_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ def formatTime(self, record: logging.LogRecord,
:return: time in string format.
"""
dt = self.converter(record.created)
if datefmt:
return dt.strftime(datefmt)
return dt.isoformat()
return dt.strftime(datefmt) if datefmt else dt.isoformat()
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function CustomFormatter.formatTime refactored with the following changes:



def _get_file_handler(path: str) -> logging.FileHandler:
Expand Down
5 changes: 3 additions & 2 deletions utils/smtp_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ def send(self, destination: str, subject: str,
file_attachment.set_payload(file_obj.read())
encoders.encode_base64(file_attachment)
file_attachment.add_header(
f'Content-Disposition',
f'attachment; filename="{file_name}"')
'Content-Disposition',
f'attachment; filename="{file_name}"',
)
Comment on lines -61 to +63
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function SMTPSender.send refactored with the following changes:

message.attach(file_attachment)
text = message.as_string()

Expand Down