'A case for multiprocessing?

Say I have a function that gives me a lot of data coming from a device when called. I want to accumulate this data in a memory buffer. When the buffer reaches an arbitrarily chosen length, another function jumps in, takes the buffer and performs some operation on it. The stream of data should not stop, so I am thinking of running this function in a separate process or thread.

A quick draft would look something like this (without threading or multiprocessing implemented):

def get_data():
    getting data...

def get_bufferchunk():
    get the current buffer and do something...

buffer = []

while True:
   
   data = get_data()
   buffer.append(data)
   
   if len(data) == 100000:
      get_bufferchunk(buffer)
      buffer = []

So the function get_bufferchunk() should then run in a parallel process, so that the while loop can keep running and the data stream does not get blocked.

My question if this is a reasonable idea and if so one would use multiprocessing or threading for this purpose? I only see the issue, that one has to make sure that the code inside the function gets executed faster than the buffer refilling to 1000000 values.



Solution 1:[1]

The comment offered by Michael Butscher, if I understand it correctly, is suggesting that you implement the buffer object as a shared memory type, perhaps a multiprocessing.sharedctypes.RawArray of some suitable type. In that way, data can be passed between processes very efficiently since the data resides in shared memory and both processes can directly "see" the data without having to pass it from one address space to another. But the RawArray class only supports a limited number of data types it can hold, which may or may not be an issue for you. Also, you need to come up with a way of allowing the process that is collecting the data to continue to add data to the array while the other process is processing the data. This could get rather complicated.

So I would first try to pass to each process a multiprocessing.Queue instance. When get_bufferchunk gets a full buffer (and this can just be an ordinary list), it then does a put of the buffer to the queue. The other process, represented for example by worker function process_bufferchunk loops doing calls to get on the queue to retrieve the next buffer and process it. Depending on the type of data buffer is holding and what sort of processing is done by process_bufferchunk, how fast the data is coming in via get_data, etc. I would adjust accordingly what length of buffer triggers a put to the queue.

This is the general idea:

from multiprocessing import Process, Queue

def get_data():
    return 1

def get_bufferchunk(q):
    # get the current buffer and do something...

    buffer = []

    # So we eventually terminate:
    #while True:
    for _ in range(600_000):
        data = get_data()
        buffer.append(data)

        if len(buffer) == 100_000: # or a smaller chunk, for example 1000
            q.put(buffer)
            buffer = []
    # tell other process that there is no more data coming
    q.put(None)

def process_bufferchunk(q):
    while True:
        buffer = q.get()
        if buffer is None: # Sentinel?
            # Yes: Signal for us to terminate:
            break
        # For demo purposes just print the length:
        print('Buffer length = ', len(buffer))

def main():
    q = Queue()
    p1 = Process(target=get_bufferchunk, args=(q,))
    p2 = Process(target=process_bufferchunk, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

# Required for Windows:
if __name__ == '__main__':
    main()

Prints:

Buffer length =  100000
Buffer length =  100000
Buffer length =  100000
Buffer length =  100000
Buffer length =  100000
Buffer length =  100000

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Booboo