Python + ZMQ: Operation cannot be accomplished in current state

I am trying to get a python program to communicate with another python program via zeromq by using the request-reply pattern. The client program should send a request to the server program which replies.

I have two servers such that when one server fails the other takes over. Communication works perfect when the first server works, however, when the first server fails and when I make a request to the second server, I see the error:

zmp.error.ZMQError: Operation cannot be accomplished in current state

Code of the server 1:

# Run the server
while True:

    # Define the socket using the "Context"
    sock = context.socket(zmq.REP)
    sock.bind("tcp://127.0.0.1:5677")
    data = sock.recv().decode("utf-8")
    res = "Recvd"
    sock.send(res.encode('utf-8'))

Code of the server 2:

# Run the server
while True:

    # Define the socket using the "Context"
    sock = context.socket(zmq.REP)
    sock.bind("tcp://127.0.0.1:5877")
    data = sock.recv().decode("utf-8")
    res = "Recvd"
    sock.send(res.encode('utf-8'))

Code of client:

# ZeroMQ Context For distributed Message amogst processes
context = zmq.Context()
sock_1 = context.socket(zmq.REQ)
sock_2 = context.socket(zmq.REQ)
sock_1.connect("tcp://127.0.0.1:5677")
sock_2.connect("tcp://127.0.0.1:5877")

try:
    sock_1.send(data.encode('utf-8'), zmq.NOBLOCK)
    socks_1.setsockopt(zmq.RCVTIMEO, 1000)
    socks_1.setsockopt(zmq.LINGER, 0)
    data = socks_1.recv().decode('utf-8') #receive data from the main node  

except:
    try:
        #when server one fails
        sock_2.send(data.encode('utf-8'), zmq.NOBLOCK)
        socks_2.setsockopt(zmq.RCVTIMEO, 1000)
        socks_2.setsockopt(zmq.LINGER, 0)
        data = socks_2.recv().decode('utf-8')
    except Exception as e:
         print(str(e))

What is the problem with this approach? How can I resolve this?


Asked by: Abigail344 | Posted: 06-12-2021






Answer 1

Q: How can I resolve this?
A: Avoid the known risk of REQ/REP deadlocking!

While the ZeroMQ is a powerful framework, understanding its internal composition is necessary for robust and reliable distributed systems design and prototyping.

After a closer look, using a common REQ/REP Formal Communication Pattern may leave ( and does leave ) counter-parties in a mutual dead-lock: where one is expecting the other to do a step, which will be never accomplished, and there is no way to escape from the deadlocked state.

For more illustrated details and FSA-schematic diagram, see this post

Next, a fail-over system has to survive any collisions of its own components. Thus, one has to design well the distributed system state-signalling and avoid as many dependencies on element-FSA-design/stepping/blocking as possible, otherwise, the fail-safe behaviour remains just an illusion.

Always handle resources with care, do not consider components of the ZeroMQ smart-signalling/messaging as any kind of "expendable disposables", doing so might be tolerated in scholar examples, not in production system environments. You still have to pay the costs ( time, resources allocations / de-allocations / garbage-collection(s) ). As noted in comments, never let resources creation/allocation without a due control. while True: .socket(); .bind(); .send(); is brutally wrong in principle and deteriorating the rest of the design.

Answered by: Miller484 | Posted: 07-01-2022



Answer 2

On server side, "receive" and "send" pair is critical. I was facing a simiar issue, while socket.send was missed.

def zmq_listen():
    global counter
    message = socket_.recv().decode("utf-8")
    logger.info(f"[{counter}] Message: {message}")
    request = json.loads(message)
    request["msg_id"] = f"m{counter}"
    ack = {"msg_id": request["msg_id"]}
    socket_.send(json.dumps(ack).encode("utf-8"))
    return request

Answered by: Brad588 | Posted: 07-01-2022



Answer 3

Implement the lazy pirate pattern. Create a new socket from your context when an error is caught, before trying to send the message again.

The pretty good brute force solution is to close and reopen the REQ socket after an error

Here is a python example.

#
#   Author: Daniel Lundin <dln(at)eintr(dot)org>
#
from __future__ import print_function

import zmq

REQUEST_TIMEOUT = 2500
REQUEST_RETRIES = 3
SERVER_ENDPOINT = "tcp://localhost:5555"

context = zmq.Context(1)

print("I: Connecting to server…")
client = context.socket(zmq.REQ)
client.connect(SERVER_ENDPOINT)

poll = zmq.Poller()
poll.register(client, zmq.POLLIN)

sequence = 0
retries_left = REQUEST_RETRIES
while retries_left:
    sequence += 1
    request = str(sequence).encode()
    print("I: Sending (%s)" % request)
    client.send(request)

    expect_reply = True
    while expect_reply:
        socks = dict(poll.poll(REQUEST_TIMEOUT))
        if socks.get(client) == zmq.POLLIN:
            reply = client.recv()
            if not reply:
                break
            if int(reply) == sequence:
                print("I: Server replied OK (%s)" % reply)
                retries_left = REQUEST_RETRIES
                expect_reply = False
            else:
                print("E: Malformed reply from server: %s" % reply)

        else:
            print("W: No response from server, retrying…")
            # Socket is confused. Close and remove it.
            client.setsockopt(zmq.LINGER, 0)
            client.close()
            poll.unregister(client)
            retries_left -= 1
            if retries_left == 0:
                print("E: Server seems to be offline, abandoning")
                break
            print("I: Reconnecting and resending (%s)" % request)
            # Create new connection
            client = context.socket(zmq.REQ)
            client.connect(SERVER_ENDPOINT)
            poll.register(client, zmq.POLLIN)
            client.send(request)

context.term()

Answered by: Caroline914 | Posted: 07-01-2022



Similar questions

python - How would a system tray application be accomplished on other platforms?

Windows has the "system tray" that houses the clock and alway-running services like MSN, Steam, etc. I'd like to develop a wxPython tray application but I'm wondering how well this will port to other platforms. What is the tray equivalent on each platform, and how much manual work would be required to support Windows, OSX and Linux (which shells in particular would be friendliest).


python - 'if' statement only accomplished only once in a loop

I am simulating a cache system by performing a cumulative sum of bytes and, when this sums up to 0.95 the size of the cache, forces the value to be 0.9 of the full size. Final result should evolve as a saw tooth. This is the code I implement to force this condition. cache_size=1e11 #B high_mark=0.95 low_mark=0.50 ordered_datestamp['Cache']=0 for i in range(1,rows_number): ordered_datestamp...


python - Pillow, centering of text not working, how is this accomplished?

I have tested the calculations and the math is correct (and takes into account the height and width of the font), but after Python creates the image and I put it into Photoshop, the vertical and horizontal centering of the text is not correct. Should I be doing something else with my code? from PIL import Image, ImageDraw, ImageFont # base = Image.open(&quot;Images/Phones/KK17018_Navy_KH10089.jpg&quot;).con...






Still can't find your answer? Check out these communities...



PySlackers | Full Stack Python | NHS Python | Pythonist Cafe | Hacker Earth | Discord Python



top