'Parallelize Python Dictionary Comprehension
I'm trying to parallelize the subsetting of a Python dictionary. The code below creates a new dictionary, positions_sub
, based on if the keys in positions
dictionary are found in a list, node_list
:
positions_sub = {}
for k,v in positions.items():
if k in node_list:
positions_sub[k] = v
This code works just fine and does exactly what I want. However, it takes a while to run so I'm trying to parallelize it. I was trying to do this in the code below, but it returns positions_sub
as a list of dictionaries, which isn't what I want. There are also some issues with the number of values per key. Any ideas how to get this working? Thanks!
from joblib import Parallel, delayed
def dict_filter(k,v):
if k in node_list:
positions_sub[k] = v
return positions_sub
positions_sub = Parallel(n_jobs=-1,)(delayed(dict_filter)(k,v)for k,v in positions.items())
Solution 1:[1]
Before you resort to parallelization you should make sure you are using the right data structure for each task: Remember that x in list
is essentially O(n)
whereas x in set
(and also x in dict
) is more like O(1)
. Therefore just converting your node_list
to a set
can improve the performance tremendously.
node_list = set(node_list)
positions_sub = {}
for k,v in positions.items():
if k in node_list:
positions_sub[k] = v
An other thing to consider is the ratio between len(positions)
and len(node_list)
. If one is substantially smaller than the other you should always iterate over the smaller one.
EDIT: some code for performance comparisons
import random
import timeit
import functools
def generate(n_positions=1000, n_node_list=100):
positions = { i:i for i in random.sample(range(n_positions), n_positions) }
node_list = random.sample(range(max(n_positions, n_node_list)), n_node_list)
return positions, node_list
def validate(variant):
data = generate(1000, 100)
if sorted(data[1]) != sorted(k for k in variant(*data)):
raise Exception(f"{variant.__name__} failed")
def measure(variant, data, repeats=1000):
total_seconds = timeit.Timer(functools.partial(variant, *data)).timeit(repeats)
average_ms = total_seconds / repeats * 1000
print(f"{variant.__name__:10s} took an average of {average_ms:0.2f}ms per pass over {repeats} passes" )
def variant1(positions, node_list):
positions_sub = {}
for k,v in positions.items():
if k in node_list:
positions_sub[k] = v
return positions_sub
def variant1b(positions, node_list):
node_list = set(node_list)
positions_sub = {}
for k,v in positions.items():
if k in node_list:
positions_sub[k] = v
return positions_sub
def variant2(positions, node_list):
return {k:v for k,v in positions.items() if k in node_list}
def variant2b(positions, node_list):
node_list = set(node_list)
return {k:v for k,v in positions.items() if k in node_list}
def variant3(positions, node_list):
return {k:positions[k] for k in node_list if k in positions}
if __name__ == "__main__":
variants = [variant1,variant1b,variant2,variant2b,variant3]
for variant in variants:
validate(variant)
n_positions = 4000
n_node_list = 1000
n_repeats = 100
data = generate(n_node_list, n_node_list)
print(f"data generated with len(positions)={n_positions} and len(node_list)={n_node_list}")
for variant in variants:
measure(variant, data, n_repeats)
EDIT2: as requested, here some results on my machine
first run:
data generated with len(positions)=4000 and len(node_list)=1000
variant1 took an average of 6.90ms per pass over 100 passes
variant1b took an average of 0.22ms per pass over 100 passes
variant2 took an average of 6.95ms per pass over 100 passes
variant2b took an average of 0.12ms per pass over 100 passes
variant3 took an average of 0.19ms per pass over 100 passes
second run:
data generated with len(positions)=40000 and len(node_list)=10000
variant1 took an average of 738.23ms per pass over 10 passes
variant1b took an average of 2.04ms per pass over 10 passes
variant2 took an average of 739.51ms per pass over 10 passes
variant2b took an average of 1.52ms per pass over 10 passes
variant3 took an average of 1.85ms per pass over 10 passes
Note that n=len(positions)
and m=len(node_list)
have been selected such that the ratio n/m=4
is roughly equivalent to that of the original data which has been specified by OP as 1.2M for n
and 300K for m
.
Observe the effect of scaling up by a factor of 10 from the first to the second run: Where in the first run variant1b is about 31 times faster than variant1, in the second run it is 361 times faster! This is the expected result of reducing the complexity of the k in node_list
from O(m) to O(1). The total time complexity of variant1 is n*m = 0.25*n^2 = O(n^2) whereas variant1b has only n*1 = O(n). This means that for every order of magnitude that n increases, variant1b is also an order of magnitude faster than variant1.
That a similar performance improvement can be achieved by parallelization alone is rather doubtful, as by and large the expected performance gain of an embarrassingly parallelizable problem is a multiple of the available CPUs, which is still a constant factor and nowhere near the gain of improving the algorithm from O(n^2) to O(n).
Also, while IMHO the given problem falls into the class of embarrassingly parallelizable problems, the output must be aggregated after the parallel processing before it can be used. Furthermore I'm quite unfamiliar with joblib which is why I have skipped adding it to the comparison.
Solution 2:[2]
You can use asyncio. (Documentation can be found [here][1]). It is used as a foundation for multiple Python asynchronous frameworks that provide high-performance network and web-servers, database connection libraries, distributed task queues, etc. Plus it has both high-level and low-level APIs to accomodate any kind of problem.
import asyncio
def background(f):
def wrapped(*args, **kwargs):
return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)
return wrapped
@background
def your_function(argument):
#code
Now this function will be run in parallel whenever called without putting main program into wait state. You can use it to parallelize for loop as well. When called for a for loop, though loop is sequential but every iteration runs in parallel to the main program as soon as interpreter gets there.
For your specific case you can do:
import asyncio
import time
def background(f):
def wrapped(*args, **kwargs):
return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)
return wrapped
@background
def add_to_dictionary(k,v):
time.sleep(1) # Added Sleep to better demonstrate parallelization
print(f"function called for {k=}\n", end='')
if k in node_list:
positions_sub[k] = v
# Random data to demonstrate parallelization
positions = {i:i for i in range(20)}
node_list = [key for key in positions if not key%3 or not key%5]
print(f"{positions=}, {node_list=}")
positions_sub = dict()
loop = asyncio.get_event_loop() # Have a new event loop
looper = asyncio.gather(*[add_to_dictionary(k,v) for k, v in positions.items()])
# Run the loop
results = loop.run_until_complete(looper) # Wait until finish
print('loop finished')
print(f"{positions_sub=}")
This produces following output:
positions={0: 0, 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 7: 7, 8: 8, 9: 9, 10: 10, 11: 11, 12: 12, 13: 13, 14: 14, 15: 15, 16: 16, 17: 17, 18: 18, 19: 19},
node_list=[0, 3, 5, 6, 9, 10, 12, 15, 18]
function called for k=0
function called for k=6
function called for k=5
function called for k=4
function called for k=2
function called for k=1
function called for k=3
function called for k=7
function called for k=11
function called for k=10
function called for k=8
function called for k=15
function called for k=14
function called for k=12
function called for k=9
function called for k=13
function called for k=19
function called for k=18
function called for k=17
function called for k=16
loop finished
positions_sub={3: 3, 6: 6, 5: 5, 0: 0, 10: 10, 15: 15, 9: 9, 12: 12, 18: 18}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | user3666197 |
Solution 2 | Hamza |