Comment 5 for bug 1658913

Revision history for this message
lihong (lihong) wrote :

Yes,I add zmq.NOBLOCK flag to socket.send() of RequestSenderDirect class.

When send CALL message,socket's option IMMEDIATE=1.
Messages whill be queued only to completed connections.
This will cause the socket to block if there are no
other connections. We need to avoid it.
It's necessary to add NOBLOCK flag to send() function.
The function will be performed in non-blocking mode.
If the message cannot be queued on the socket,
the send() function will fail with errno set to EAGAIN.

see \eventlet\green\zmq.py:
class Socket(_Socket):
    @_wraps(_Socket.send)
    def send(self, msg, flags=0, copy=True, track=False):
        """A send method that's safe to use when multiple greenthreads
        are calling send, send_multipart, recv and recv_multipart on
        the same socket.
        """
        if flags & NOBLOCK:
            result = _Socket_send(self, msg, flags, copy, track)
            # Instead of calling both wake methods, could call
            # self.getsockopt(EVENTS) which would trigger wakeups if
            # needed.
            self._eventlet_send_event.wake()
            self._eventlet_recv_event.wake()
            return result

        # TODO: pyzmq will copy the message buffer and create Message
        # objects under some circumstances. We could do that work here
        # once to avoid doing it every time the send is retried.
        flags |= NOBLOCK
        with self._eventlet_send_lock:
            while True:
                try:
                    return _Socket_send(self, msg, flags, copy, track)
                except ZMQError as e:
                    if e.errno == EAGAIN:
                        self._eventlet_send_event.block()
                    else:
                        raise
                finally:
                    # The call to send processes 0mq events and may
                    # make the socket ready to recv. Wake the next
                    # receiver. (Could check EVENTS for POLLIN here)
                    self._eventlet_recv_event.wake()