'python zeromq Parallel Pipeline multiple consumers (workers)

Hello together!

I would like to combine data from different python programs via zeromq. I think for that job the best solution would be parallel pipelines as are described in this example at the ponit "3. Parallel Pipeline (parallel pipeline mode)".

So in the beginning I wanted to test the functionality with just very simple examples. That for I've just used the three patterns producer, consumer and resultcollector that you can find on the example. I just made small changes:

Producer

import zmq

context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:5566")

work_message = "start working"
socket.send_json(work_message)

Consumer_1

import random
import zmq


context = zmq.Context()
consumer_id = 199

#receiving work
consumer_receiver = context.socket(zmq.PULL)
consumer_receiver.connect("tcp://localhost:5566")

#forewarding results
consumer_sender = context.socket(zmq.PUSH)
consumer_sender.bind("tcp://*:5500")

msg = consumer_receiver.recv_json()

for i in (0, 100):
    if msg == "start working":
        data = "id: " + str(consumer_id) + "; Hello"
        consumer_sender.send_json(data)

Result Collector

#coding:utf-8

import zmq

context = zmq.Context()
result_receiver = context.socket(zmq.PULL)
result_receiver.connect("tcp://localhost:5599")

result = result_receiver.recv_json()

collected_data = []

for i in (0, 10000):
    collected_data.append(result)
    
print(collected_data)

So the main communication between them is working. But now I've tried to add another consumer (worker) who delivers data. It should be possible, as you can see in the example I've linked. Here you can see my second consumer (worker):

Consumer_2

import random
import zmq


context = zmq.Context()
consumer_id = 10

#receiving work
consumer_receiver = context.socket(zmq.PULL)
consumer_receiver.connect("tcp://localhost:5566")

#forewarding results
consumer_sender = context.socket(zmq.PUSH)
consumer_sender.bind("tcp://*:5599")

msg = consumer_receiver.recv_json()

for i in (0, 100):
    if msg == "start working":
        data = "id: " + str(consumer_id) + "; World"
        consumer_sender.send_json(data)

It's pretty much the same as consumer_1. But when I want to run both consumers i get the error message "ZMQError: Adress in use":

---------------------------------------------------------------------------
ZMQError                                  Traceback (most recent call last)
<ipython-input-1-af8297fe1137> in <module>
     12 #forewarding results
     13 consumer_sender = context.socket(zmq.PUSH)
---> 14 consumer_sender.bind("tcp://*:5599")
     15 
     16 msg = consumer_receiver.recv_json()

~\Anaconda3\lib\site-packages\zmq\sugar\socket.py in bind(self, addr)
    171 
    172         """
--> 173         super().bind(addr)
    174         return self._bind_cm(addr)
    175 

zmq/backend/cython/socket.pyx in zmq.backend.cython.socket.Socket.bind()

~\Anaconda3\lib\site-packages\zmq\backend\cython\checkrc.pxd in zmq.backend.cython.checkrc._check_rc()

ZMQError: Address in use

Where is my fault? It has to be possible to push with different programms to a single port (as in the example port 5599) or am I wrong? I am pretty new to programming at all so sorry if you think that this is a stupid question.

It would be very nice if you guys could help me out here.

Thanks a lot!



Solution 1:[1]

Q : "Where is my fault? ... Okay understood, but what shall I do then?"

A :
The safest way ( as even the ZeroMQ default values can and do evolve, from version to version ) is to design-in some level of self-protecting robustness.

I have experienced making my own code deadlocking the hardware and a reboot was the tool of last resort to repair an ill-designed code - so if you can learn from my mistakes, the better.

While these templates are not one-size-fits-all, yet they provide structure for further elaborated self-protecting, near-real-time capable , that even in those cases, that something crashes, keep a rule-of-thumb to gracefully dismantle and release all blocked resources back to the O/S ( so that next time the process gets re-launched, it will find it's "own" .bind()-ready ports free, ready to re-bind() a ZeroMQ AccessPoint back to the same port ( not so if was not gracefully released and zmq.LINGER kept a Context()-instance in an infinite waiting-loop if not set properly not to do so - do not rely on "current" version implicit default value, it might change again to some other default-value and any such based assumptions simply cease to work ).

The template for collector should look like this :

import zmq

context = zmq.Context()
#------------.Context()-call error-handling
...
result_receiver = context.socket( zmq.PULL )
#------------------------.socket()-call error-handling
...
result_receiver.setsockopt( zmq.LINGER,  0 )    # a default value might 
change in future versions as it did already 
#--------------.setsockopt() settings for other attributes
...
result_receiver.bind( "tcp://localhost:5599" )  # a BIND()-side, many .connect() here
#--------------.bind() error-handling
...
collected_data = []
poll_will_not_block_longer_than_ms = 5
try:
    #------------------------------------- INTENDED PATH:
    for i in range( 0, 10000 ):
        if ( 0 == result_receiver.poll( poll_will_not_block_longer_than_ms ):
             #--------------.poll() has nothing to deliver on .recv()
             # do some maintenance work
             ...
             # do some "right-sized" sleep ( by a countdown residual et al )
        else:
             #--------------.poll() SIG'd a .recv()-able message:
             result = result_receiver.recv_json()
             #-----------------------.recv_json()-call error-handling
             ...
             collected_data.append( result )
             #-------------.append() can silently grow till RAM exhausted
             #                       can kill the O/S, be careful in production
     #---------------------------------------------------
except KeyboardInterrupt:
     #aKeyboardInterrupt Handler PATH:
     ...
except *other*Exception:
     #anOther*Exception Handler PATH:
     ...
except:
     #all remaining Exceptions' handler code:
     ...

finally:
     #----------------------------------------- CORE ZeroMQ TIDY-UP CODE:
     result_receiver.close()
     context.term()
     ...
#---------------------------------------------- all resources gracefully released

print( collected_data )

Similarly the template for consumer-forwarder should look like this :

import zmq

DOWNLINK_URL = "tcp://*:5566"
UPLINK_URL   = "tcp://*:5599"

context = zmq.Context()
#------------.Context()-call error-handling
...
consumer_receiver = context.socket( zmq.PULL )
#--------------------------.socket()-call error-handling
...
consumer_receiver.setsockopt( zmq.LINGER,  0 )    # a default value might 
change in future versions as it did already 
#----------------.setsockopt() settings for other attributes
...
consumer_receiver.connect( DOWNLINK_URL )
#----------------.connect()-call error-handling
...
consumer_sender = context.socket( zmq.PUSH )
#------------------------.socket()-call error-handling
...
consumer_sender.setsockopt( zmq.LINGER,  0 )    # a default value might 
change in future versions as it did already 
#--------------.setsockopt() settings for other attributes
...
consumer_sender.connect( UPLINK_URL )
#--------------.connect()-call error-handling
...
consumer_id = 199
poll_will_not_block_longer_than_ms = 5
try:
    #------------------------------------- INTENDED PATH:
    for i in range( 0, 10000 ):
        if ( 0 == consumer_receiver.poll( poll_will_not_block_longer_than_ms ):
             #---------------------.poll() has nothing to deliver on .recv()
             # do some maintenance work
             ...
             # do some "right-sized" sleep ( by a countdown residual et al )
        else:
             #---------------------.poll() SIG'd a .recv()-able message:
             result = consumer_receiver.recv_json()
             #-------------------------.recv_json()-call error-handling
             ...
             data = "id: " + str( consumer_id ) + "; Hello"
             consumer_sender.send_json( data, zmq.NOBLOCK )
             #--------------.send_json()-call error-handling
             ...
     #---------------------------------------------------
except KeyboardInterrupt:
     #aKeyboardInterrupt Handler PATH:
     ...
except *other*Exception:
     #anOther*Exception Handler PATH:
     ...
except:
     #all remaining Exceptions' handler code:
     ...

finally:
     #----------------------------------------- CORE ZeroMQ TIDY-UP CODE:
     for aSocket in ( consumer_receiver,
                      consumer_sender ):
         aSocket.close()
     
     context.term()
     ...
#---------------------------------------------- all resources gracefully released
...

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 user3666197