'Large dataset, ProcessPoolExecutor issues

PROBLEM - ProcessPoolExecutor hasn't increased speed. Confirmed by tqdm

Learned enough about python to copy and/or write a program that works. each file takes ~40 seconds to load->filter->write. I have ~6,800 files to work through and want a better version which uses all my processing power (6 cores), I tried to write that version (below). Said version produces, however slightly slower than my original function:

from concurrent.futures import ProcessPoolExecutor
from glob import glob
from json import dump
from tqdm import tqdm
from pybufrkit.decoder import Decoder, generate_bufr_message
from pybufrkit.renderer import FlatJsonRenderer

decoder = Decoder()
DIRECTORY = 'C://blah/'
files = glob(DIRECTORY+'*.bufr')
PHI_MAX, PHI_MIN, LAMBDA_MAX, LAMBDA_MIN = x,x,x,x #Integers

def load_decode_filter(file):
    '''`
     Phi and Lambda ranges are ~1 degree so this eviscerates 99.7% of each file
    '''
    output_message = []
    with open(file, 'rb') as ins:
        for bufr_message in generate_bufr_message(
                decoder,ins.read()):
            input_list = FlatJsonRenderer().render(bufr_message)[3][2] #necessary for [mask] to function
            mask = [obj for obj in input_list if ((PHI_MAX > obj[
                12] > PHI_MIN) & (LAMBDA_MAX > obj[13] > LAMBDA_MIN))]
            output_message.extend(mask)
        return output_message

def main(files_in):
    '''
    attempt to intiate all cores in loading and filter bufr files
    '''
    with ProcessPoolExecutor(max_workers=6) as executor:
        with tqdm(range(len(files_in)), desc='files loaded',
                  position=0) as progress:
            futures = []
            for file in files_in:
                future = executor.submit(load_decode_filter(file), file)
                future.add_done_callback(lambda p: progress.update())
                futures.append(future)
            results = []
            for future in futures:
                result = future.result()
                results.append(result)
    with open(DIRECTORY+'bufrout.json', 'w', encoding='utf-8') as f_o:
        dump(results, f_o)

if __name__ == '__main__':
    main(files)

I was hoping to at least cut processing time per file.


Update, Closing:
First of all, I'd like to thank everyone who commented as well as the answerer (I'm too new to upvote). Seems like the only way to meaningfully increase efficiency would be to never decode in the first place and take what I want from in-situ bufr data, this is simply beyond my current ability (it is my first exposure to code of any kind).


I plan to (am currently) running my initial version (f.bufr in, f.bufr_.txt out) as I am able, I'll move processed files to subdirectory after each "run". Silver lining is I've learned enough doing this that I'll be able to make a program to combine all text output into one file. Thanks again.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source