'Python: How to efficiently open and read a zipfile from multiple processes
I'm trying to open the same .zip file from multiple processes using zipfile
and multiprocessing
.
When I open the .zip file using the with zipfile.ZipFile(self.path, 'r') as archive:
syntax, the code across multiple processes works without issues, but the preformance degrades 100-fold (in some out of topic usage patterns) compared to keeping the ZipFile opened and to call read from it (guarded by multiprocessing.Lock()
instance).
When I use the "lock" version from multiple processes, I end up getting zipfile.BadZipFile
exception thrown.
I don't understand why.
Here's a MNWC:
import os
import time
import numpy as np
import multiprocessing
import zipfile
class ZipFileSource(object):
def __init__(self, path, access_mode="lock"):
self.path = path
self.lock = multiprocessing.Lock()
self.set_access_mode(access_mode)
self.files = self.access( lambda archive: archive.infolist())
def set_access_mode(self, access_mode):
def access_lock(f):
with self.lock:
return f(self.archive)
def access_file(f):
with zipfile.ZipFile(self.path, 'r') as archive:
return f(archive)
if access_mode == "lock":
print("lock")
self.archive = zipfile.ZipFile(self.path, 'r')
self.access = access_lock
elif access_mode == "file":
print("file")
self.archive = None
self.access = access_file
def __del__(self):
if self.archive is not None:
self.archive.close()
def __len__(self):
return len(self.files)
def __getitem__(self, index):
if index >= len(self):
raise IndexError(f'Index {index} >= len {len(self)}')
member = self.files[index]
data_bytes = self.access( lambda archive: archive.read(member))
return data_bytes
dirname = os.path.join(os.path.dirname(__file__), 'test_filesource_data')
fs_small = ZipFileSource(os.path.join(dirname, 'foo_bar_small.zip'), access_mode="lock")
def read_small(i):
return fs_small[i%len(fs_small)][42]
if __name__ == '__main__':
n_frames = 1000
frames = np.arange(1000, dtype='u4')
# read_small(0)
def f(nb_processes, access_mode):
pool = multiprocessing.Pool(processes=nb_processes)
fs_small.set_access_mode(access_mode)
start = time.time()
res = 0
for i in pool.imap_unordered(read_small, frames):
res += i
delta = time.time() - start
print(f"access_mode = {access_mode}, nb processes = {nb_processes}, res = {res}, {delta*1e3/len(frames)} ms/frame")
return res
f(1, "file")
f(4, "file")
f(4, "lock")
f(4, "lock") #crash
Here's my console output:
lock
file
access_mode = file, nb processes = 1, res = 110289, 0.08039402961730957 ms/frame
file
access_mode = file, nb processes = 4, res = 110289, 0.32297492027282715 ms/frame
lock
access_mode = lock, nb processes = 4, res = 110289, 0.2950408458709717 ms/frame
lock
multiprocessing.pool.RemoteTraceback:
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "/path/to/script/test_filesource.py", line 64, in read_small
return fs_small[i%len(fs_small)][42]
File "/path/to/script/test_filesource.py", line 55, in __getitem__
data_bytes = self.access( lambda archive: archive.read(member))
File "/path/to/script/test_filesource.py", line 27, in access_lock
return f(self.archive)
File "/path/to/script/test_filesource.py", line 55, in <lambda>
data_bytes = self.access( lambda archive: archive.read(member))
File "/usr/lib/python3.6/zipfile.py", line 1337, in read
with self.open(name, "r", pwd) as fp:
File "/usr/lib/python3.6/zipfile.py", line 1419, in open
% (zinfo.orig_filename, fname))
zipfile.BadZipFile: File name in directory '00000005.pkl' and header b'00000004.pkl' differ.
You can find the zipfile used to reproduced the bug here bug report
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|