'python zeromq Parallel Pipeline multiple consumers (workers)
Hello together!
I would like to combine data from different python programs via zeromq. I think for that job the best solution would be parallel pipelines as are described in this example at the ponit "3. Parallel Pipeline (parallel pipeline mode)".
So in the beginning I wanted to test the functionality with just very simple examples. That for I've just used the three patterns producer
, consumer
and resultcollector
that you can find on the example. I just made small changes:
Producer
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:5566")
work_message = "start working"
socket.send_json(work_message)
Consumer_1
import random
import zmq
context = zmq.Context()
consumer_id = 199
#receiving work
consumer_receiver = context.socket(zmq.PULL)
consumer_receiver.connect("tcp://localhost:5566")
#forewarding results
consumer_sender = context.socket(zmq.PUSH)
consumer_sender.bind("tcp://*:5500")
msg = consumer_receiver.recv_json()
for i in (0, 100):
if msg == "start working":
data = "id: " + str(consumer_id) + "; Hello"
consumer_sender.send_json(data)
Result Collector
#coding:utf-8
import zmq
context = zmq.Context()
result_receiver = context.socket(zmq.PULL)
result_receiver.connect("tcp://localhost:5599")
result = result_receiver.recv_json()
collected_data = []
for i in (0, 10000):
collected_data.append(result)
print(collected_data)
So the main communication between them is working. But now I've tried to add another consumer (worker) who delivers data. It should be possible, as you can see in the example I've linked. Here you can see my second consumer (worker):
Consumer_2
import random
import zmq
context = zmq.Context()
consumer_id = 10
#receiving work
consumer_receiver = context.socket(zmq.PULL)
consumer_receiver.connect("tcp://localhost:5566")
#forewarding results
consumer_sender = context.socket(zmq.PUSH)
consumer_sender.bind("tcp://*:5599")
msg = consumer_receiver.recv_json()
for i in (0, 100):
if msg == "start working":
data = "id: " + str(consumer_id) + "; World"
consumer_sender.send_json(data)
It's pretty much the same as consumer_1. But when I want to run both consumers i get the error message "ZMQError: Adress in use":
---------------------------------------------------------------------------
ZMQError Traceback (most recent call last)
<ipython-input-1-af8297fe1137> in <module>
12 #forewarding results
13 consumer_sender = context.socket(zmq.PUSH)
---> 14 consumer_sender.bind("tcp://*:5599")
15
16 msg = consumer_receiver.recv_json()
~\Anaconda3\lib\site-packages\zmq\sugar\socket.py in bind(self, addr)
171
172 """
--> 173 super().bind(addr)
174 return self._bind_cm(addr)
175
zmq/backend/cython/socket.pyx in zmq.backend.cython.socket.Socket.bind()
~\Anaconda3\lib\site-packages\zmq\backend\cython\checkrc.pxd in zmq.backend.cython.checkrc._check_rc()
ZMQError: Address in use
Where is my fault? It has to be possible to push with different programms to a single port (as in the example port 5599) or am I wrong? I am pretty new to programming at all so sorry if you think that this is a stupid question.
It would be very nice if you guys could help me out here.
Thanks a lot!
Solution 1:[1]
Q : "Where is my fault? ... Okay understood, but what shall I do then?"
A :
The safest way ( as even the ZeroMQ default values can and do evolve, from version to version ) is to design-in some level of self-protecting robustness.
I have experienced making my own code deadlocking the hardware and a reboot was the tool of last resort to repair an ill-designed code - so if you can learn from my mistakes, the better.
While these templates are not one-size-fits-all, yet they provide structure for further elaborated self-protecting, near-real-time capable distributed-computing, that even in those cases, that something crashes, keep a rule-of-thumb to gracefully dismantle and release all blocked resources back to the O/S ( so that next time the process gets re-launched, it will find it's "own" .bind()
-ready ports free, ready to re-bind()
a ZeroMQ AccessPoint back to the same port ( not so if was not gracefully released and zmq.LINGER
kept a Context()
-instance in an infinite waiting-loop if not set properly not to do so - do not rely on "current" version implicit default value, it might change again to some other default-value and any such based assumptions simply cease to work ).
The template for collector
should look like this :
import zmq
context = zmq.Context()
#------------.Context()-call error-handling
...
result_receiver = context.socket( zmq.PULL )
#------------------------.socket()-call error-handling
...
result_receiver.setsockopt( zmq.LINGER, 0 ) # a default value might
change in future versions as it did already
#--------------.setsockopt() settings for other attributes
...
result_receiver.bind( "tcp://localhost:5599" ) # a BIND()-side, many .connect() here
#--------------.bind() error-handling
...
collected_data = []
poll_will_not_block_longer_than_ms = 5
try:
#------------------------------------- INTENDED PATH:
for i in range( 0, 10000 ):
if ( 0 == result_receiver.poll( poll_will_not_block_longer_than_ms ):
#--------------.poll() has nothing to deliver on .recv()
# do some maintenance work
...
# do some "right-sized" sleep ( by a countdown residual et al )
else:
#--------------.poll() SIG'd a .recv()-able message:
result = result_receiver.recv_json()
#-----------------------.recv_json()-call error-handling
...
collected_data.append( result )
#-------------.append() can silently grow till RAM exhausted
# can kill the O/S, be careful in production
#---------------------------------------------------
except KeyboardInterrupt:
#aKeyboardInterrupt Handler PATH:
...
except *other*Exception:
#anOther*Exception Handler PATH:
...
except:
#all remaining Exceptions' handler code:
...
finally:
#----------------------------------------- CORE ZeroMQ TIDY-UP CODE:
result_receiver.close()
context.term()
...
#---------------------------------------------- all resources gracefully released
print( collected_data )
Similarly the template for consumer
-forwarder should look like this :
import zmq
DOWNLINK_URL = "tcp://*:5566"
UPLINK_URL = "tcp://*:5599"
context = zmq.Context()
#------------.Context()-call error-handling
...
consumer_receiver = context.socket( zmq.PULL )
#--------------------------.socket()-call error-handling
...
consumer_receiver.setsockopt( zmq.LINGER, 0 ) # a default value might
change in future versions as it did already
#----------------.setsockopt() settings for other attributes
...
consumer_receiver.connect( DOWNLINK_URL )
#----------------.connect()-call error-handling
...
consumer_sender = context.socket( zmq.PUSH )
#------------------------.socket()-call error-handling
...
consumer_sender.setsockopt( zmq.LINGER, 0 ) # a default value might
change in future versions as it did already
#--------------.setsockopt() settings for other attributes
...
consumer_sender.connect( UPLINK_URL )
#--------------.connect()-call error-handling
...
consumer_id = 199
poll_will_not_block_longer_than_ms = 5
try:
#------------------------------------- INTENDED PATH:
for i in range( 0, 10000 ):
if ( 0 == consumer_receiver.poll( poll_will_not_block_longer_than_ms ):
#---------------------.poll() has nothing to deliver on .recv()
# do some maintenance work
...
# do some "right-sized" sleep ( by a countdown residual et al )
else:
#---------------------.poll() SIG'd a .recv()-able message:
result = consumer_receiver.recv_json()
#-------------------------.recv_json()-call error-handling
...
data = "id: " + str( consumer_id ) + "; Hello"
consumer_sender.send_json( data, zmq.NOBLOCK )
#--------------.send_json()-call error-handling
...
#---------------------------------------------------
except KeyboardInterrupt:
#aKeyboardInterrupt Handler PATH:
...
except *other*Exception:
#anOther*Exception Handler PATH:
...
except:
#all remaining Exceptions' handler code:
...
finally:
#----------------------------------------- CORE ZeroMQ TIDY-UP CODE:
for aSocket in ( consumer_receiver,
consumer_sender ):
aSocket.close()
context.term()
...
#---------------------------------------------- all resources gracefully released
...
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | user3666197 |