'Python: How to efficiently open and read a zipfile from multiple processes

I'm trying to open the same .zip file from multiple processes using zipfile and multiprocessing. When I open the .zip file using the with zipfile.ZipFile(self.path, 'r') as archive: syntax, the code across multiple processes works without issues, but the preformance degrades 100-fold (in some out of topic usage patterns) compared to keeping the ZipFile opened and to call read from it (guarded by multiprocessing.Lock() instance). When I use the "lock" version from multiple processes, I end up getting zipfile.BadZipFileexception thrown.

I don't understand why.

Here's a MNWC:

import os
import time
import numpy as np
import multiprocessing
import zipfile



class ZipFileSource(object):

    def __init__(self, path, access_mode="lock"):
        self.path = path
        self.lock = multiprocessing.Lock()
        self.set_access_mode(access_mode)
        self.files = self.access( lambda archive: archive.infolist())

    def set_access_mode(self, access_mode):

        def access_lock(f):
            with self.lock:
                return f(self.archive)

        def access_file(f):
            with zipfile.ZipFile(self.path, 'r') as archive:
                return f(archive)

        if access_mode == "lock":
            print("lock")
            self.archive = zipfile.ZipFile(self.path, 'r')
            self.access = access_lock
        elif access_mode == "file":
            print("file")
            self.archive = None
            self.access = access_file



    def __del__(self):
        if self.archive is not None:
            self.archive.close()

    def __len__(self):
        return len(self.files)

    def __getitem__(self, index):
        if index >= len(self):
            raise IndexError(f'Index {index} >= len {len(self)}')
        member = self.files[index]
        data_bytes = self.access( lambda archive: archive.read(member))
        return data_bytes


dirname = os.path.join(os.path.dirname(__file__), 'test_filesource_data')

fs_small = ZipFileSource(os.path.join(dirname, 'foo_bar_small.zip'), access_mode="lock")

def read_small(i):
    return fs_small[i%len(fs_small)][42]


if __name__ == '__main__':

    n_frames = 1000
    frames = np.arange(1000, dtype='u4')

    # read_small(0)


    def f(nb_processes, access_mode):

        pool = multiprocessing.Pool(processes=nb_processes)
        fs_small.set_access_mode(access_mode)
        start = time.time()
        res = 0
        for i in pool.imap_unordered(read_small, frames):
            res += i
        delta = time.time() - start
        print(f"access_mode = {access_mode}, nb processes = {nb_processes}, res = {res}, {delta*1e3/len(frames)} ms/frame")
        return res

    f(1, "file")
    f(4, "file")
    f(4, "lock") 
    f(4, "lock") #crash

Here's my console output:

lock
file
access_mode = file, nb processes = 1, res = 110289, 0.08039402961730957 ms/frame
file
access_mode = file, nb processes = 4, res = 110289, 0.32297492027282715 ms/frame
lock
access_mode = lock, nb processes = 4, res = 110289, 0.2950408458709717 ms/frame
lock
multiprocessing.pool.RemoteTraceback: 

Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "/path/to/script/test_filesource.py", line 64, in read_small
    return fs_small[i%len(fs_small)][42]
  File "/path/to/script/test_filesource.py", line 55, in __getitem__
    data_bytes = self.access( lambda archive: archive.read(member))
  File "/path/to/script/test_filesource.py", line 27, in access_lock
    return f(self.archive)
  File "/path/to/script/test_filesource.py", line 55, in <lambda>
    data_bytes = self.access( lambda archive: archive.read(member))
  File "/usr/lib/python3.6/zipfile.py", line 1337, in read
    with self.open(name, "r", pwd) as fp:
  File "/usr/lib/python3.6/zipfile.py", line 1419, in open
    % (zinfo.orig_filename, fname))
zipfile.BadZipFile: File name in directory '00000005.pkl' and header b'00000004.pkl' differ.

You can find the zipfile used to reproduced the bug here bug report



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source