'Multiprocessing : use tqdm to display a progress bar
To make my code more "pythonic" and faster, I use "multiprocessing" and a map function to send it a) the function and b) the range of iterations.
The implanted solution (i.e., call tqdm directly on the range tqdm.tqdm(range(0, 30)) does not work with multiprocessing (as formulated in the code below).
The progress bar is displayed from 0 to 100% (when python reads the code?) but it does not indicate the actual progress of the map function.
How to display a progress bar that indicates at which step the 'map' function is ?
from multiprocessing import Pool
import tqdm
import time
def _foo(my_number):
square = my_number * my_number
time.sleep(1)
return square
if __name__ == '__main__':
p = Pool(2)
r = p.map(_foo, tqdm.tqdm(range(0, 30)))
p.close()
p.join()
Any help or suggestions are welcome...
Solution 1:[1]
Solution found. Be careful! Due to multiprocessing, the estimation time (iteration per loop, total time, etc.) could be unstable, but the progress bar works perfectly.
Note: Context manager for Pool
is only available in Python 3.3+.
from multiprocessing import Pool
import time
from tqdm import *
def _foo(my_number):
square = my_number * my_number
time.sleep(1)
return square
if __name__ == '__main__':
with Pool(processes=2) as p:
max_ = 30
with tqdm(total=max_) as pbar:
for _ in p.imap_unordered(_foo, range(0, max_)):
pbar.update()
Solution 2:[2]
Use imap
instead of map
, which returns an iterator of the processed values.
from multiprocessing import Pool
import tqdm
import time
def _foo(my_number):
square = my_number * my_number
time.sleep(1)
return square
if __name__ == '__main__':
with Pool(2) as p:
r = list(tqdm.tqdm(p.imap(_foo, range(30)), total=30))
Solution 3:[3]
Sorry for being late but if all you need is a concurrent map, I added this functionality in tqdm>=4.42.0
:
from tqdm.contrib.concurrent import process_map # or thread_map
import time
def _foo(my_number):
square = my_number * my_number
time.sleep(1)
return square
if __name__ == '__main__':
r = process_map(_foo, range(0, 30), max_workers=2)
References: https://tqdm.github.io/docs/contrib.concurrent/ and https://github.com/tqdm/tqdm/blob/master/examples/parallel_bars.py
It supports max_workers
and chunksize
and you can also easily switch from process_map
to thread_map
.
Solution 4:[4]
You can use p_tqdm
instead.
https://github.com/swansonk14/p_tqdm
from p_tqdm import p_map
import time
def _foo(my_number):
square = my_number * my_number
time.sleep(1)
return square
if __name__ == '__main__':
r = p_map(_foo, list(range(0, 30)))
Solution 5:[5]
based on the answer of Xavi MartÃnez I wrote the function imap_unordered_bar
. It can be used in the same way as imap_unordered
with the only difference that a processing bar is shown.
from multiprocessing import Pool
import time
from tqdm import *
def imap_unordered_bar(func, args, n_processes = 2):
p = Pool(n_processes)
res_list = []
with tqdm(total = len(args)) as pbar:
for i, res in tqdm(enumerate(p.imap_unordered(func, args))):
pbar.update()
res_list.append(res)
pbar.close()
p.close()
p.join()
return res_list
def _foo(my_number):
square = my_number * my_number
time.sleep(1)
return square
if __name__ == '__main__':
result = imap_unordered_bar(_foo, range(5))
Solution 6:[6]
import multiprocessing as mp
import tqdm
iterable = ...
num_cpu = mp.cpu_count() - 2 # dont use all cpus.
def func():
# your logic
...
if __name__ == '__main__':
with mp.Pool(num_cpu) as p:
list(tqdm.tqdm(p.imap(func, iterable), total=len(iterable)))
Solution 7:[7]
For progress bar with apply_async, we can use following code as suggested in:
https://github.com/tqdm/tqdm/issues/484
import time
import random
from multiprocessing import Pool
from tqdm import tqdm
def myfunc(a):
time.sleep(random.random())
return a ** 2
pool = Pool(2)
pbar = tqdm(total=100)
def update(*a):
pbar.update()
for i in range(pbar.total):
pool.apply_async(myfunc, args=(i,), callback=update)
pool.close()
pool.join()
Solution 8:[8]
Here is my take for when you need to get results back from your parallel executing functions. This function does a few things (there is another post of mine that explains it further) but the key point is that there is a tasks pending queue and a tasks completed queue. As workers are done with each task in the pending queue they add the results in the tasks completed queue. You can wrap the check to the tasks completed queue with the tqdm progress bar. I am not putting the implementation of the do_work() function here, it is not relevant, as the message here is to monitor the tasks completed queue and update the progress bar every time a result is in.
def par_proc(job_list, num_cpus=None, verbose=False):
# Get the number of cores
if not num_cpus:
num_cpus = psutil.cpu_count(logical=False)
print('* Parallel processing')
print('* Running on {} cores'.format(num_cpus))
# Set-up the queues for sending and receiving data to/from the workers
tasks_pending = mp.Queue()
tasks_completed = mp.Queue()
# Gather processes and results here
processes = []
results = []
# Count tasks
num_tasks = 0
# Add the tasks to the queue
for job in job_list:
for task in job['tasks']:
expanded_job = {}
num_tasks = num_tasks + 1
expanded_job.update({'func': pickle.dumps(job['func'])})
expanded_job.update({'task': task})
tasks_pending.put(expanded_job)
# Set the number of workers here
num_workers = min(num_cpus, num_tasks)
# We need as many sentinels as there are worker processes so that ALL processes exit when there is no more
# work left to be done.
for c in range(num_workers):
tasks_pending.put(SENTINEL)
print('* Number of tasks: {}'.format(num_tasks))
# Set-up and start the workers
for c in range(num_workers):
p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed, verbose))
p.name = 'worker' + str(c)
processes.append(p)
p.start()
# Gather the results
completed_tasks_counter = 0
with tqdm(total=num_tasks) as bar:
while completed_tasks_counter < num_tasks:
results.append(tasks_completed.get())
completed_tasks_counter = completed_tasks_counter + 1
bar.update(completed_tasks_counter)
for p in processes:
p.join()
return results
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | richardec |
Solution 2 | one |
Solution 3 | |
Solution 4 | Victor Quach |
Solution 5 | Oliver Wilken |
Solution 6 | |
Solution 7 | |
Solution 8 | Nick B. |