'Read files with ZipFile using multiprocessing

I'm trying to read raw data from a zipfile. The structure of that file is:

  • zipfile
    • data
      • Spectral0.data
      • Spectral1.data
      • Spectral[...].data
      • Spectral300.data
    • Header

The goal is to read all Spectral[...].data into an 2D numpy array (whereas Spectral0.data would be the first column). The single threaded approach takes a lot of time since reading one .data file takes some seconds.

import zipfile
import numpy as np

spectralData = np.zeros(shape = (dimY, dimX), dtype=np.int16)
archive = zipfile.ZipFile(path, 'r')

for file in range(fileCount):
    spectralDataRaw = archive.read('data/Spectral' + str(file) + '.data') 
    spectralData[:,file] = np.frombuffer(spectralDataRaw, np.short)
    

And I thought using multiprocessing could speed up the process. So I read some tutorials how to set up a multiprocessing procedure. This is what I came up with:

import zipfile
import numpy as np
import multiprocessing
from joblib import Parallel, delayed

archive = zipfile.ZipFile(path, 'r')
numCores = multiprocessing.cpu_count()

def testMult(file):
    spectralDataRaw = archive.read('data/Spectral' + str(file) + '.data')
    return np.frombuffer(spectralDataRaw, np.short)


output = Parallel(n_jobs=numCores)(delayed(testMult)(file)for file in range(fileCount))
output = np.flipud(np.rot90(np.array(output), 1, axes = (0,2)))

Using this approach I get the following error:

numCores = multiprocessing.cpu_count()

def testMult(file):
    spectralDataRaw = archive.read('data/Spectral' + str(file) + '.data')
    return np.frombuffer(spectralDataRaw, np.short)


output = Parallel(n_jobs=numCores)(delayed(testMult)(file)for file in range(fileCount))
output = np.flipud(np.rot90(np.array(output), 1, axes = (0,2)))
_RemoteTraceback: 
"""
Traceback (most recent call last):
  File "C:\ProgramData\Anaconda3\envs\devEnv2\lib\site-packages\joblib\externals\loky\backend\queues.py", line 153, in _feed
    obj_ = dumps(obj, reducers=reducers)
  File "C:\ProgramData\Anaconda3\envs\devEnv2\lib\site-packages\joblib\externals\loky\backend\reduction.py", line 271, in dumps
    dump(obj, buf, reducers=reducers, protocol=protocol)
  File "C:\ProgramData\Anaconda3\envs\devEnv2\lib\site-packages\joblib\externals\loky\backend\reduction.py", line 264, in dump
    _LokyPickler(file, reducers=reducers, protocol=protocol).dump(obj)
  File "C:\ProgramData\Anaconda3\envs\devEnv2\lib\site-packages\joblib\externals\cloudpickle\cloudpickle_fast.py", line 563, in dump
    return Pickler.dump(self, obj)
TypeError: cannot pickle '_io.BufferedReader' object
"""


The above exception was the direct cause of the following exception:

Traceback (most recent call last):

  File "<ipython-input-94-c4b007eea8e2>", line 8, in <module>
    output = Parallel(n_jobs=numCores)(delayed(testMult)(file)for file in range(fileCount))

  File "C:\ProgramData\Anaconda3\envs\devEnv2\lib\site-packages\joblib\parallel.py", line 1061, in __call__
    self.retrieve()

  File "C:\ProgramData\Anaconda3\envs\devEnv2\lib\site-packages\joblib\parallel.py", line 940, in retrieve
    self._output.extend(job.get(timeout=self.timeout))

  File "C:\ProgramData\Anaconda3\envs\devEnv2\lib\site-packages\joblib\_parallel_backends.py", line 542, in wrap_future_result
    return future.result(timeout=timeout)

  File "C:\ProgramData\Anaconda3\envs\devEnv2\lib\concurrent\futures\_base.py", line 432, in result
    return self.__get_result()

  File "C:\ProgramData\Anaconda3\envs\devEnv2\lib\concurrent\futures\_base.py", line 388, in __get_result
    raise self._exception

PicklingError: Could not pickle the task to send it to the workers.

My question is, how do I set up this parallelization correctly. I've read that zipfile is not thread safe and therefore I might need a different approach to read the zip content into memory(RAM). I would rather not read the whole zipfile into memory since the file can be quite large.

I thought about using from numba import njit, prange but there the problem occurs that zip is not supported by numba. What else could I do to make this work?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source