From ec65d3b675a2d3270fc3e7f1ff89f367859c76c1 Mon Sep 17 00:00:00 2001 From: "Zhilong.JI" Date: Thu, 23 Apr 2015 17:04:33 +0800 Subject: [PATCH] Fix: mistakenly returning None in rpc.call at least two messages are needed for sending a rpc.call result, one for the result's content, another for marking ending of response which is a dict with ending=True.If the order of this two messages is broken, the rpc client will first receve ending=True, and return None for rpc.call, instead of the actual result. Section 4.7 of the AMQP 0-9-1 core specification explains the conditions under which ordering is guaranteed: messages published in one channel, passing through one exchange and one queue and one outgoing channel will be received in the same order that they were sent.(quoted from https://www.rabbitmq.com/semantics.html). But the original implementaion of ProxyCallback may use two different connections for sending result and 'ending=True', which may randomly cause disorder or responses.By restricting one connection for one rpc.call response, this bug should be fixed. Change-Id: Ic6bf12646c7be8757741d3c3308cccd14e741929 --- nova/openstack/common/rpc/amqp.py | 63 ++++++++++++++++++++----------------- 1 file changed, 35 insertions(+), 28 deletions(-) diff --git a/nova/openstack/common/rpc/amqp.py b/nova/openstack/common/rpc/amqp.py index cc0098c..0a126ef 100644 --- a/nova/openstack/common/rpc/amqp.py +++ b/nova/openstack/common/rpc/amqp.py @@ -219,14 +219,14 @@ class ReplyProxy(ConnectionContext): return self._reply_q -def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None, - failure=None, ending=False, log_failure=True): +def msg_reply(conf, msg_id, reply_q, connection=None, connection_pool=None, + reply=None, failure=None, ending=False, log_failure=True): """Sends a reply or an error on the channel signified by msg_id. Failure should be a sys.exc_info() tuple. """ - with ConnectionContext(conf, connection_pool) as conn: + def _send(conn, failure): if failure: failure = rpc_common.serialize_remote_exception(failure, log_failure) @@ -249,6 +249,12 @@ def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None, else: conn.direct_send(msg_id, rpc_common.serialize_msg(msg)) + if connection: + _send(connection, failure) + else: + with ConnectionContext(conf, connection_pool) as conn: + _send(conn, failure) + class RpcContext(rpc_common.CommonRpcContext): """Context that supports replying to a rpc.call""" @@ -266,10 +272,10 @@ class RpcContext(rpc_common.CommonRpcContext): return self.__class__(**values) def reply(self, reply=None, failure=None, ending=False, - connection_pool=None, log_failure=True): + connection=None, connection_pool=None, log_failure=True): if self.msg_id: - msg_reply(self.conf, self.msg_id, self.reply_q, connection_pool, - reply, failure, ending, log_failure) + msg_reply(self.conf, self.msg_id, self.reply_q, connection, + connection_pool, reply, failure, ending, log_failure) if ending: self.msg_id = None @@ -426,28 +432,29 @@ class ProxyCallback(_ThreadPoolWithWait): proxy we have here. """ ctxt.update_store() - try: - rval = self.proxy.dispatch(ctxt, version, method, **args) - # Check if the result was a generator - if inspect.isgenerator(rval): - for x in rval: - ctxt.reply(x, None, connection_pool=self.connection_pool) - else: - ctxt.reply(rval, None, connection_pool=self.connection_pool) - # This final None tells multicall that it is done. - ctxt.reply(ending=True, connection_pool=self.connection_pool) - except rpc_common.ClientException as e: - LOG.debug(_('Expected exception during message handling (%s)') % - e._exc_info[1]) - ctxt.reply(None, e._exc_info, - connection_pool=self.connection_pool, - log_failure=False) - except Exception: - # sys.exc_info() is deleted by LOG.exception(). - exc_info = sys.exc_info() - LOG.error(_('Exception during message handling'), - exc_info=exc_info) - ctxt.reply(None, exc_info, connection_pool=self.connection_pool) + with ConnectionContext(self.conf, self.connection_pool) as conn: + try: + rval = self.proxy.dispatch(ctxt, version, method, **args) + # Check if the result was a generator + if inspect.isgenerator(rval): + for x in rval: + ctxt.reply(x, None, connection=conn) + else: + ctxt.reply(rval, None, connection=conn) + # This final None tells multicall that it is done. + ctxt.reply(ending=True, connection=conn) + except rpc_common.ClientException as e: + LOG.debug(_('Expected exception during message handling (%s)') % + e._exc_info[1]) + ctxt.reply(None, e._exc_info, + connection=conn, + log_failure=False) + except Exception: + # sys.exc_info() is deleted by LOG.exception(). + exc_info = sys.exc_info() + LOG.error(_('Exception during message handling'), + exc_info=exc_info) + ctxt.reply(None, exc_info, connection=conn) class MulticallProxyWaiter(object): -- 1.7.9.5