'Best way to wait for queue population python multiprocessing
It is first time I play with parallel computing seriously.
I am using multiprocessing
module in python and I am running into this problem:
A queue consumer run in a different process then queue producer, the former should wait the latter to finish its job, before stop iterating over the queue. Sometimes the consumer is faster then producer and the queue stays empty. If I don't put any condition the program won't stop.
In the sample code I use the wildcard PRODUCER_IS_OVER
to example what I need.
following code sketch the problem:
def save_data(save_que, file_):
### Coroutine instantiation
PRODUCER_IS_OVER = False
empty = False
### Queue consumer
while not(empty and PRODUCER_IS_OVER):
try:
data = save_que.get()
print("saving data",data)
except:
empty = save_que.empty()
print(empty)
pass
#PRODUCER_IS_OVER = get_condition()
print ("All data saved")
return
def get_condition():
###NameError: global name 'PRODUCER_IS_OVER' is not defined
if PRODUCER_IS_OVER:
return True
else:
return False
def produce_data(save_que):
for _ in range(5):
time.sleep(random.randint(1,5))
data = random.randint(1,10)
print("sending data", data)
save_que.put(data)
### Main function here
import random
import time
from multiprocessing import Queue, Manager, Process
manager = Manager()
save_que = manager.Queue()
file_ = "file"
save_p = Process(target= save_data, args=(save_que, file_))
save_p.start()
PRODUCER_IS_OVER = False
produce_data(save_que)
PRODUCER_IS_OVER = True
save_p.join()
produce_data
takes variable time and I want the save_p process to start BEFORE populate the queue, in order to consume the queue while is filled.
I think there are workaround to communicate when to stop iteration, but I want to know whether exist a proper way to do it.
I tried both multiprocessing.Pipe and .Lock, but I don't know how implement correctly and efficiently.
SOLVED: is it the best way?
following code implement STOPMESSAGE in the Q, works fine, I can refine it with a class, QMsg
, in case the language supports only static types.
def save_data(save_que, file_):
# Coroutine instantiation
PRODUCER_IS_OVER = False
empty = False
# Queue consumer
while not(empty and PRODUCER_IS_OVER):
data = save_que.get()
empty = save_que.empty()
print("saving data", data)
if data == "STOP":
PRODUCER_IS_OVER = True
print("All data saved")
return
def get_condition():
# NameError: global name 'PRODUCER_IS_OVER' is not defined
if PRODUCER_IS_OVER:
return True
else:
return False
def produce_data(save_que):
for _ in range(5):
time.sleep(random.randint(1, 5))
data = random.randint(1, 10)
print("sending data", data)
save_que.put(data)
save_que.put("STOP")
# Main function here
import random
import time
from multiprocessing import Queue, Manager, Process
manager = Manager()
save_que = manager.Queue()
file_ = "file"
save_p = Process(target=save_data, args=(save_que, file_))
save_p.start()
PRODUCER_IS_OVER = False
produce_data(save_que)
PRODUCER_IS_OVER = True
save_p.join()
But this cannot work in case the queue is produced by several separated process: who is going to send the ALT message in that case?
another solution is to store the processes index in a list and execute:
def some_alive():
for p in processes:
if p.is_alive():
return True
return False
But multiprocessing
supports .is_alive
method only in the parent process, which is limiting in my case.
Solution 1:[1]
What you're asking for is the default behavior of queue.get
. It will wait (block) until an item is available from the queue. Sending a sentinel value is indeed the prefered way to end a child-process.
Your scenario could be simplified to something like that:
import random
import time
from multiprocessing import Manager, Process
def save_data(save_que, file_):
for data in iter(save_que.get, 'STOP'):
print("saving data", data)
print("All data saved")
return
def produce_data(save_que):
for _ in range(5):
time.sleep(random.randint(1, 5))
data = random.randint(1, 10)
print("sending data", data)
save_que.put(data)
save_que.put("STOP")
if __name__ == '__main__':
manager = Manager()
save_que = manager.Queue()
file_ = "file"
save_p = Process(target=save_data, args=(save_que, file_))
save_p.start()
produce_data(save_que)
save_p.join()
Edit to answer question in the comment:
How should I implement the stop message in case the cue is accessed by several different agents and each one has a randomized time for finishing its task?
It's not much different, you have to put as much sentinel values into the queue as much consumers you have.
A utility function which returns a streamlogger to see where the action is:
def get_stream_logger(level=logging.DEBUG):
"""Return logger with configured StreamHandler."""
stream_logger = logging.getLogger('stream_logger')
stream_logger.handlers = []
stream_logger.setLevel(level)
sh = logging.StreamHandler()
sh.setLevel(level)
fmt = '[%(asctime)s %(levelname)-8s %(processName)s] --- %(message)s'
formatter = logging.Formatter(fmt)
sh.setFormatter(formatter)
stream_logger.addHandler(sh)
return stream_logger
Code with multiple consumers:
import random
import time
from multiprocessing import Manager, Process
import logging
def save_data(save_que, file_):
stream_logger = get_stream_logger()
for data in iter(save_que.get, 'STOP'):
time.sleep(random.randint(1, 5)) # random delay
stream_logger.debug(f"saving: {data}") # DEBUG
stream_logger.debug("all data saved") # DEBUG
return
def produce_data(save_que, n_workers):
stream_logger = get_stream_logger()
for _ in range(5):
time.sleep(random.randint(1, 5))
data = random.randint(1, 10)
stream_logger.debug(f"producing: {data}") # DEBUG
save_que.put(data)
for _ in range(n_workers):
save_que.put("STOP")
if __name__ == '__main__':
file_ = "file"
n_processes = 2
manager = Manager()
save_que = manager.Queue()
processes = []
for _ in range(n_processes):
processes.append(Process(target=save_data, args=(save_que, file_)))
for p in processes:
p.start()
produce_data(save_que, n_workers=n_processes)
for p in processes:
p.join()
Example output:
[2018-09-02 20:10:35,885 DEBUG MainProcess] --- producing: 2
[2018-09-02 20:10:38,887 DEBUG MainProcess] --- producing: 8
[2018-09-02 20:10:38,887 DEBUG Process-2] --- saving: 2
[2018-09-02 20:10:39,889 DEBUG MainProcess] --- producing: 8
[2018-09-02 20:10:40,889 DEBUG Process-3] --- saving: 8
[2018-09-02 20:10:40,890 DEBUG Process-2] --- saving: 8
[2018-09-02 20:10:42,890 DEBUG MainProcess] --- producing: 1
[2018-09-02 20:10:43,891 DEBUG Process-3] --- saving: 1
[2018-09-02 20:10:46,893 DEBUG MainProcess] --- producing: 5
[2018-09-02 20:10:46,894 DEBUG Process-3] --- all data saved
[2018-09-02 20:10:50,895 DEBUG Process-2] --- saving: 5
[2018-09-02 20:10:50,896 DEBUG Process-2] --- all data saved
Process finished with exit code 0
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 |