Python + ZMQ: Operation cannot be accomplished in current state
I am trying to get a python program to communicate with another python program via zeromq by using the request-reply pattern. The client program should send a request to the server program which replies.
I have two servers such that when one server fails the other takes over. Communication works perfect when the first server works, however, when the first server fails and when I make a request to the second server, I see the error:
zmp.error.ZMQError: Operation cannot be accomplished in current state
Code of the server 1:
# Run the server
while True:
# Define the socket using the "Context"
sock = context.socket(zmq.REP)
sock.bind("tcp://127.0.0.1:5677")
data = sock.recv().decode("utf-8")
res = "Recvd"
sock.send(res.encode('utf-8'))
Code of the server 2:
# Run the server
while True:
# Define the socket using the "Context"
sock = context.socket(zmq.REP)
sock.bind("tcp://127.0.0.1:5877")
data = sock.recv().decode("utf-8")
res = "Recvd"
sock.send(res.encode('utf-8'))
Code of client:
# ZeroMQ Context For distributed Message amogst processes
context = zmq.Context()
sock_1 = context.socket(zmq.REQ)
sock_2 = context.socket(zmq.REQ)
sock_1.connect("tcp://127.0.0.1:5677")
sock_2.connect("tcp://127.0.0.1:5877")
try:
sock_1.send(data.encode('utf-8'), zmq.NOBLOCK)
socks_1.setsockopt(zmq.RCVTIMEO, 1000)
socks_1.setsockopt(zmq.LINGER, 0)
data = socks_1.recv().decode('utf-8') #receive data from the main node
except:
try:
#when server one fails
sock_2.send(data.encode('utf-8'), zmq.NOBLOCK)
socks_2.setsockopt(zmq.RCVTIMEO, 1000)
socks_2.setsockopt(zmq.LINGER, 0)
data = socks_2.recv().decode('utf-8')
except Exception as e:
print(str(e))
What is the problem with this approach? How can I resolve this?
Asked by: Abigail344 | Posted: 06-12-2021
Answer 1
Q: How can I resolve this?
A: Avoid the known risk of REQ/REP
deadlocking!
While the ZeroMQ is a powerful framework, understanding its internal composition is necessary for robust and reliable distributed systems design and prototyping.
After a closer look, using a common REQ/REP
Formal Communication Pattern may leave ( and does leave ) counter-parties in a mutual dead-lock: where one is expecting the other to do a step, which will be never accomplished, and there is no way to escape from the deadlocked state.
For more illustrated details and FSA-schematic diagram, see this post
Next, a fail-over system has to survive any collisions of its own components. Thus, one has to design well the distributed system state-signalling and avoid as many dependencies on element-FSA-design/stepping/blocking as possible, otherwise, the fail-safe behaviour remains just an illusion.
Always handle resources with care, do not consider components of the ZeroMQ smart-signalling/messaging as any kind of "expendable disposables", doing so might be tolerated in scholar examples, not in production system environments. You still have to pay the costs ( time, resources allocations / de-allocations / garbage-collection(s) ). As noted in comments, never let resources creation/allocation without a due control. while True: .socket(); .bind(); .send();
is brutally wrong in principle and deteriorating the rest of the design.
Answer 2
On server side, "receive" and "send" pair is critical. I was facing a simiar issue, while socket.send was missed.
def zmq_listen():
global counter
message = socket_.recv().decode("utf-8")
logger.info(f"[{counter}] Message: {message}")
request = json.loads(message)
request["msg_id"] = f"m{counter}"
ack = {"msg_id": request["msg_id"]}
socket_.send(json.dumps(ack).encode("utf-8"))
return request
Answered by: Brad588 | Posted: 07-01-2022
Answer 3
Implement the lazy pirate pattern. Create a new socket from your context when an error is caught, before trying to send the message again.
The pretty good brute force solution is to close and reopen the REQ socket after an error
Here is a python example.
#
# Author: Daniel Lundin <dln(at)eintr(dot)org>
#
from __future__ import print_function
import zmq
REQUEST_TIMEOUT = 2500
REQUEST_RETRIES = 3
SERVER_ENDPOINT = "tcp://localhost:5555"
context = zmq.Context(1)
print("I: Connecting to server…")
client = context.socket(zmq.REQ)
client.connect(SERVER_ENDPOINT)
poll = zmq.Poller()
poll.register(client, zmq.POLLIN)
sequence = 0
retries_left = REQUEST_RETRIES
while retries_left:
sequence += 1
request = str(sequence).encode()
print("I: Sending (%s)" % request)
client.send(request)
expect_reply = True
while expect_reply:
socks = dict(poll.poll(REQUEST_TIMEOUT))
if socks.get(client) == zmq.POLLIN:
reply = client.recv()
if not reply:
break
if int(reply) == sequence:
print("I: Server replied OK (%s)" % reply)
retries_left = REQUEST_RETRIES
expect_reply = False
else:
print("E: Malformed reply from server: %s" % reply)
else:
print("W: No response from server, retrying…")
# Socket is confused. Close and remove it.
client.setsockopt(zmq.LINGER, 0)
client.close()
poll.unregister(client)
retries_left -= 1
if retries_left == 0:
print("E: Server seems to be offline, abandoning")
break
print("I: Reconnecting and resending (%s)" % request)
# Create new connection
client = context.socket(zmq.REQ)
client.connect(SERVER_ENDPOINT)
poll.register(client, zmq.POLLIN)
client.send(request)
context.term()
Answered by: Caroline914 | Posted: 07-01-2022
Similar questions
python - How would a system tray application be accomplished on other platforms?
Windows has the "system tray" that houses the clock and alway-running services like MSN, Steam, etc.
I'd like to develop a wxPython tray application but I'm wondering how well this will port to other platforms. What is the tray equivalent on each platform, and how much manual work would be required to support Windows, OSX and Linux (which shells in particular would be friendliest).
python - 'if' statement only accomplished only once in a loop
I am simulating a cache system by performing a cumulative sum of bytes and, when this sums up to 0.95 the size of the cache, forces the value to be 0.9 of the full size.
Final result should evolve as a saw tooth.
This is the code I implement to force this condition.
cache_size=1e11 #B
high_mark=0.95
low_mark=0.50
ordered_datestamp['Cache']=0
for i in range(1,rows_number):
ordered_datestamp...
python - Pillow, centering of text not working, how is this accomplished?
I have tested the calculations and the math is correct (and takes into account the height and width of the font), but after Python creates the image and I put it into Photoshop, the vertical and horizontal centering of the text is not correct. Should I be doing something else with my code?
from PIL import Image, ImageDraw, ImageFont
# base = Image.open("Images/Phones/KK17018_Navy_KH10089.jpg").con...
Still can't find your answer? Check out these communities...
PySlackers | Full Stack Python | NHS Python | Pythonist Cafe | Hacker Earth | Discord Python