'Failing to load model using multiprocessing on windows

This program works on Unix and I'm trying to transition it to windows.

It uses multiprocessing and I understand it's an issue with being forced to use spawning on windows opposed to forking on linux for multiprocessing.

It's to do with the subprocess loading the models that I load on the main thread.

When Consumer() is called, during the init() it calls a function from another file that loads some tensorflow models.

Consumer()

import os
import time
import sys
from PIL import Image
import subprocess
from multiprocessing import Process, Queue
import t2identify

class Consumer:

    def __init__(self, frameSource, layer):
        self.directory = os.environ["directory"]
        self.source = frameSource
        self.task = layer
        if layer == "screen":
            self.layer = t2identify.identifyImage("apps")
        elif layer == "detail":
        self.layer = "detail"

        self.imagesChecked = 0
        self.errorsFound = 0

        self.previousApp = "none"
        self.appDetail = {
            "mobile": t2identify.identifyImage("mobile"),
        }

    def start(self, state, queue):
        self.state = state
    
        consumer1 = Process(target=self.consumeImage, args=(queue,))
        consumer1.start()
    
        consumer2 = Process(target=self.consumeImage, args=(queue,))
        consumer2.start()

        consumer1.join()
        consumer2.join()

t2identify.identifyImage() involves loading models.

t2identify.py

import matplotlib.pyplot as plt
import numpy as np
import os
from PIL import Image

class identifyImage():

    def __init__(self, layer):
        import tensorflow as tf
        availableLayers = {
            "apps":"C:/Users/PycharmProjects/NNs/tf/appModel",
        }
        self.selectedLayer = availableLayers[layer]
        self.model = tf.saved_model.load(self.selectedLayer)
        self.label = self.loadLabels(availableLayers[layer]+"/labels.txt")
        self.img_height = 224
        self.img_width = 224
...

I'm confident the issue is when the consumer subprocess starts, the models that are loaded here are loaded again, why it says they're not found I'm not sure.

main.py

import pathos
import multiprocessing
import os
import time
import shutil
from os import path
from producer import Producer
from consumer import Consumer
from controller import Controller
from tqdm import tqdm
import re
import sys
from env import environmentVariables as Environment

class Main:

def start(self, test):
    self.createDirectories()
    screenQueue = multiprocessing.Queue()
    detailQueue = multiprocessing.Queue()

    self.producer = Producer(["SavedFrames", "ScreenFrames", "DetailFrames", "VisualFrames"])

    self.producerProcess = multiprocessing.Process(target=self.producer.start,
                                            args=(self.producerEvent, self.producerFrameRate, self.state,
                                                  self.expected, self.iteration, self.testCaseNumber,
                                                  [screenQueue, detailQueue]))

    self.screenConsumer = Consumer("ScreenFrames", "screen")
    # MODELS ARE LOADED
    self.detailConsumer = Consumer("DetailFrames", "detail")

    self.screenConsumerProcess = multiprocessing.Process(target=self.screenConsumer.start,
                                                         args=(self.state, screenQueue))
    self.detailConsumerProcess = multiprocessing.Process(target=self.detailConsumer.start,
                                                         args=(self.state, detailQueue))

    try:
        # Set the new thread to run the controller which performs the test cases
        self.controllerStart = Controller(self.producerEvent, self.producerFrameRate, self.state, self.expected, self.iteration,
                                                self.testCaseNumber, self.progress)
        self.controllerProcess = multiprocessing.Process(target=self.controllerStart.start, args=(test,))

    except:
        print("ERROR")
        return False

    self.producerProcess.start()
    # FAILS on starting screenConsumerProcess (see error)
    self.screenConsumerProcess.start()
    self.detailConsumerProcess.start()
    self.controllerProcess.start()

    self.producerProcess.join()
    self.screenConsumerProcess.join()
    self.detailConsumerProcess.join()
    self.controllerProcess.join()

    self.zipFiles()
    self.sendLogs()
    return True
...
if __name__ == "__main__":
    testing = Main()
    results = testing.start()

The error:

To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 105, in spawn_main
    exitcode = _main(fd)
  File "\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 115, in _main
    self = reduction.pickle.load(from_parent)
  File "\venv\lib\site-packages\keras\saving\pickle_utils.py", line 48, in deserialize_model_from_bytecode
    model = save_module.load_model(temp_dir)
  File "\venv\lib\site-packages\keras\utils\traceback_utils.py", line 67, in error_handler
Traceback (most recent call last):
  File "C:/Users/PycharmProjects/project/src/main.py", line 353, in <module>
    raise e.with_traceback(filtered_tb) from None
  File "\venv\lib\site-packages\tensorflow\python\saved_model\load.py", line 978, in load_internal
    results = testing.start(data)
  File "/PycharmProjects/project/src/main.py", line 95, in start
    self.screenConsumerProcess.start()
  File "\AppData\Local\Programs\Python\Python37\lib\multiprocessing\process.py", line 112, in start
    str(err) + "\n You may be trying to load on a different device "
FileNotFoundError: Unsuccessful TensorSliceReader constructor: Failed to find any matching files for ram://4e4e1c18-ece9-4d99-88c3-5c2ee965c92a/variables/variables
 You may be trying to load on a different device from the computational device. Consider setting the `experimental_io_device` option in `tf.saved_model.LoadOptions` to the io_device such as '/job:localhost'.

The models that are loaded aren't accessable by subprocesses. As ram://4e4e1c18-ece9-4d99-88c3-5c2ee965c92a/variables/variables is temp_dir in keras/pickle_utils.py model = save_module.load_model(temp_dir). Which is where the error occured.

Am I running out of ram? Or do I need to change some multiprocessing code now that I'm on windows.

EDIT:

I suspect it's most likely to do with windows reimporting everything once a new process starts (spawning). While doing this, the models are reloaded, and that's where the error occurs. I am still however unsure how to go about resolving this, apart from loading the models in main then passing the models into the subprocesses as parameters... which seems like a subpar solution.

EDIT2:

Now looking into using pathos which uses dill and not pickle. As I suspect the issue is with when I start the consumer process, the target is a class, which is not pickleable.



Solution 1:[1]

To avoid pickling of the tensorflow models, try moving the creation of these models to the process that uses them:

class Consumer:

    def __init__(self, frameSource, layer):
        self.directory = os.environ["directory"]
        self.source = frameSource
        self.task = layer
        self._layer = layer
        
        # The following code is moved to the consumeImage method:
        """
        if layer == "screen":
            self.layer = t2identify.identifyImage("apps")
        elif layer == "detail":
        self.layer = "detail"
        """

        self.imagesChecked = 0
        self.errorsFound = 0

        self.previousApp = "none"
        # The following code is moved to consumeImage:
        """
        self.appDetail = {
            "mobile": t2identify.identifyImage("mobile"),
        }
        """

    def start(self, state, queue):
        self.state = state
    
        consumer1 = Process(target=self.consumeImage, args=(queue,))
        consumer1.start()
    
        consumer2 = Process(target=self.consumeImage, args=(queue,))
        consumer2.start()

        consumer1.join()
        consumer2.join()

    def consumeImage(self, queue):
        self.appDetail = {
            "mobile": t2identify.identifyImage("mobile"),
        }
        if self._layer == "screen":
            self.layer = t2identify.identifyImage("apps")
        elif self._layer == "detail":
            self.layer = "detail"
        ...

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Booboo