-
Notifications
You must be signed in to change notification settings - Fork 318
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Change ThreadPool and Worker class in thread_pool.py #628
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM thanks @nnehdi
minio/thread_pool.py
Outdated
@@ -61,8 +62,7 @@ def __init__(self, num_threads): | |||
self.results_queue = queue() | |||
self.exceptions_queue = queue() | |||
self.tasks_queue = queue(num_threads) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nnehdi you need to remove queue size restriction here.. otherwise multipart PUT won't work. The reason is that the code in this PR will fill tasks_queue with all tasks before starting to execute them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've tested the existing code (without @vadmeste's suggested correction above) and verified that multi-part uploads fails when more than num_threads
parts are involved in the multi-part upload. That is, when the tasks-queue
size is limited with num_threads
, 3 in our case, the multi-part upload hangs with files bigger than 15MB (3 x 5MB - multi-part size limit) since tasks_queue
gets full after 3rd part is tasks_queue.put
in and the tasks_queue.put
action for the 4th part blocks the operation.
So, we should not limit the size of the tasks-queue
:
self.tasks_queue = queue(num_threads)
=> self.tasks_queue = queue()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ebozduman @vadmeste I agree with you, I did not consider that. As @ebozduman said we should remove the limit of queue size because we put all chunks of a file before we start the Worker threads.
thank you, guys.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this PR @nnehdi , one important comment needs to be addressed
minio/thread_pool.py
Outdated
@@ -37,12 +37,13 @@ def __init__(self, tasks_queue, results_queue, exceptions_queue): | |||
self.tasks_queue = tasks_queue | |||
self.results_queue = results_queue | |||
self.exceptions_queue = exceptions_queue | |||
self.daemon = True | |||
self.daemon = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we turn off daemon mode, self.daemon = False
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It depends on your situation. In our use case, we want to close the thread immediately after the program stopped and it's not important for us to miss an uploading file. I change this flag to its previous value (True) but It's better to have a config that user can change this flag depends on their situation when using py-mino.
tnx
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nnehdi,
Thank you very much for working on this fix.
Are you going to push these review comment changes, so that we can merge and close the PR?
Change Worker daemon field to True. remove queue self.task_queue size restriction in ThreadPool class. Fixes minio#595
minio/thread_pool.py
Outdated
# Wait for completion of all the tasks in the queue | ||
self.tasks_queue.join() | ||
# Check if one of the thread raised an exception, if yes | ||
# raise it here in the function | ||
if not self.exceptions_queue.empty(): | ||
raise self.exceptions_queue.get() | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you also clean up these last 2 extra lines introduced?
minio/thread_pool.py
Outdated
# Wait for completion of all the tasks in the queue | ||
self.tasks_queue.join() | ||
# Check if one of the thread raised an exception, if yes | ||
# raise it here in the function | ||
if not self.exceptions_queue.empty(): | ||
raise self.exceptions_queue.get() | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you also clean up these last 2 extra space lines introduced?
minio/thread_pool.py
Outdated
@@ -42,7 +42,8 @@ def __init__(self, tasks_queue, results_queue, exceptions_queue): | |||
|
|||
def run(self): | |||
fast_quit = False | |||
while True: | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need this extra line.
minio/thread_pool.py
Outdated
@@ -83,3 +87,4 @@ def result(self): | |||
""" Return the result of all called tasks """ | |||
return self.results_queue | |||
|
|||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need this extra line.
Could you also squash when you are done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM & tested
Fixes #595