'Parallelize Python Dictionary Comprehension

I'm trying to parallelize the subsetting of a Python dictionary. The code below creates a new dictionary, positions_sub, based on if the keys in positions dictionary are found in a list, node_list:

positions_sub = {}
for k,v in positions.items():
    if k in node_list:
        positions_sub[k] = v

This code works just fine and does exactly what I want. However, it takes a while to run so I'm trying to parallelize it. I was trying to do this in the code below, but it returns positions_sub as a list of dictionaries, which isn't what I want. There are also some issues with the number of values per key. Any ideas how to get this working? Thanks!

from joblib import Parallel, delayed

def dict_filter(k,v):
    if k in node_list:
        positions_sub[k] = v
    return positions_sub
positions_sub = Parallel(n_jobs=-1,)(delayed(dict_filter)(k,v)for k,v in positions.items())


Solution 1:[1]

Before you resort to parallelization you should make sure you are using the right data structure for each task: Remember that x in list is essentially O(n) whereas x in set (and also x in dict) is more like O(1). Therefore just converting your node_list to a set can improve the performance tremendously.

node_list = set(node_list)
positions_sub = {}
for k,v in positions.items():
    if k in node_list:
        positions_sub[k] = v

An other thing to consider is the ratio between len(positions) and len(node_list). If one is substantially smaller than the other you should always iterate over the smaller one.


EDIT: some code for performance comparisons

import random
import timeit
import functools

def generate(n_positions=1000, n_node_list=100):
    positions = { i:i for i in random.sample(range(n_positions), n_positions) }
    node_list = random.sample(range(max(n_positions, n_node_list)), n_node_list)
    return positions, node_list  

def validate(variant):
    data = generate(1000, 100)
    if sorted(data[1]) != sorted(k for k in variant(*data)):
        raise Exception(f"{variant.__name__} failed")

def measure(variant, data, repeats=1000):
    total_seconds = timeit.Timer(functools.partial(variant, *data)).timeit(repeats)
    average_ms = total_seconds / repeats * 1000
    print(f"{variant.__name__:10s} took an average of {average_ms:0.2f}ms per pass over {repeats} passes" )   


def variant1(positions, node_list):
    positions_sub = {}
    for k,v in positions.items():
        if k in node_list:
            positions_sub[k] = v
    return positions_sub

def variant1b(positions, node_list):
    node_list = set(node_list)
    positions_sub = {}
    for k,v in positions.items():
        if k in node_list:
            positions_sub[k] = v
    return positions_sub

def variant2(positions, node_list):
    return {k:v for k,v in positions.items() if k in node_list}

def variant2b(positions, node_list):
    node_list = set(node_list)
    return {k:v for k,v in positions.items() if k in node_list}

def variant3(positions, node_list):
    return {k:positions[k] for k in node_list if k in positions}



if __name__ == "__main__":
    variants = [variant1,variant1b,variant2,variant2b,variant3]
    for variant in variants:
        validate(variant)      

    n_positions = 4000
    n_node_list = 1000
    n_repeats = 100
    data = generate(n_node_list, n_node_list)
    print(f"data generated with len(positions)={n_positions} and len(node_list)={n_node_list}")
    for variant in variants:
        measure(variant, data, n_repeats)

EDIT2: as requested, here some results on my machine

first run:
data generated with len(positions)=4000 and len(node_list)=1000
variant1   took an average of 6.90ms per pass over 100 passes
variant1b  took an average of 0.22ms per pass over 100 passes
variant2   took an average of 6.95ms per pass over 100 passes
variant2b  took an average of 0.12ms per pass over 100 passes
variant3   took an average of 0.19ms per pass over 100 passes

second run:
data generated with len(positions)=40000 and len(node_list)=10000
variant1   took an average of 738.23ms per pass over 10 passes
variant1b  took an average of   2.04ms per pass over 10 passes
variant2   took an average of 739.51ms per pass over 10 passes
variant2b  took an average of   1.52ms per pass over 10 passes
variant3   took an average of   1.85ms per pass over 10 passes

Note that n=len(positions) and m=len(node_list) have been selected such that the ratio n/m=4 is roughly equivalent to that of the original data which has been specified by OP as 1.2M for n and 300K for m.

Observe the effect of scaling up by a factor of 10 from the first to the second run: Where in the first run variant1b is about 31 times faster than variant1, in the second run it is 361 times faster! This is the expected result of reducing the complexity of the k in node_list from O(m) to O(1). The total time complexity of variant1 is n*m = 0.25*n^2 = O(n^2) whereas variant1b has only n*1 = O(n). This means that for every order of magnitude that n increases, variant1b is also an order of magnitude faster than variant1.

That a similar performance improvement can be achieved by parallelization alone is rather doubtful, as by and large the expected performance gain of an embarrassingly parallelizable problem is a multiple of the available CPUs, which is still a constant factor and nowhere near the gain of improving the algorithm from O(n^2) to O(n).

Also, while IMHO the given problem falls into the class of embarrassingly parallelizable problems, the output must be aggregated after the parallel processing before it can be used. Furthermore I'm quite unfamiliar with joblib which is why I have skipped adding it to the comparison.

Solution 2:[2]

You can use asyncio. (Documentation can be found [here][1]). It is used as a foundation for multiple Python asynchronous frameworks that provide high-performance network and web-servers, database connection libraries, distributed task queues, etc. Plus it has both high-level and low-level APIs to accomodate any kind of problem.

import asyncio

def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)

    return wrapped

@background
def your_function(argument):
    #code

Now this function will be run in parallel whenever called without putting main program into wait state. You can use it to parallelize for loop as well. When called for a for loop, though loop is sequential but every iteration runs in parallel to the main program as soon as interpreter gets there.

For your specific case you can do:

import asyncio
import time


def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)                                   
    return wrapped
    
@background
def add_to_dictionary(k,v):
    time.sleep(1) # Added Sleep to better demonstrate parallelization
    print(f"function called for {k=}\n", end='')
    if k in node_list:
        positions_sub[k] = v

# Random data to demonstrate parallelization
positions = {i:i for i in range(20)}
node_list = [key for key in positions if not key%3 or not key%5]
print(f"{positions=}, {node_list=}")

positions_sub = dict()

loop = asyncio.get_event_loop() # Have a new event loop

looper = asyncio.gather(*[add_to_dictionary(k,v) for k, v in positions.items()])  
# Run the loop
                             
results = loop.run_until_complete(looper) # Wait until finish


print('loop finished')
print(f"{positions_sub=}")

This produces following output:

positions={0: 0, 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 7: 7, 8: 8, 9: 9, 10: 10, 11: 11, 12: 12, 13: 13, 14: 14, 15: 15, 16: 16, 17: 17, 18: 18, 19: 19},
node_list=[0, 3, 5, 6, 9, 10, 12, 15, 18]
function called for k=0
function called for k=6
function called for k=5
function called for k=4
function called for k=2
function called for k=1
function called for k=3
function called for k=7
function called for k=11
function called for k=10
function called for k=8
function called for k=15
function called for k=14
function called for k=12
function called for k=9
function called for k=13
function called for k=19
function called for k=18
function called for k=17
function called for k=16
loop finished
positions_sub={3: 3, 6: 6, 5: 5, 0: 0, 10: 10, 15: 15, 9: 9, 12: 12, 18: 18}

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 user3666197
Solution 2 Hamza