diff -Nru oslo.messaging-1.3.0/debian/changelog oslo.messaging-1.3.0/debian/changelog --- oslo.messaging-1.3.0/debian/changelog 2016-01-22 21:52:28.000000000 +0800 +++ oslo.messaging-1.3.0/debian/changelog 2016-01-25 17:44:32.000000000 +0800 @@ -1,3 +1,11 @@ +oslo.messaging (1.3.0-0ubuntu1.5) trusty; urgency=medium + + * Backport fixes to support AMQP HA failover cases. + - d/p/fix-rabbit-starvation-of-connections-for-reply: + To solve nova-conductor infinitely loop issue (LP: #1521958). + + -- Hui Xiang Mon, 25 Jan 2016 17:43:32 +0800 + oslo.messaging (1.3.0-0ubuntu1.4) trusty; urgency=medium * Backport upstream fix (LP: #1318721): diff -Nru oslo.messaging-1.3.0/debian/patches/fix-rabbit-starvation-of-connections-for-reply.patch oslo.messaging-1.3.0/debian/patches/fix-rabbit-starvation-of-connections-for-reply.patch --- oslo.messaging-1.3.0/debian/patches/fix-rabbit-starvation-of-connections-for-reply.patch 1970-01-01 08:00:00.000000000 +0800 +++ oslo.messaging-1.3.0/debian/patches/fix-rabbit-starvation-of-connections-for-reply.patch 2016-01-25 17:42:55.000000000 +0800 @@ -0,0 +1,145 @@ +Description: Don't hold the connection when reply fail + + This change moves the reply retry code to upper layer + to be able to release the connection while we wait between + two retries. + + In the worse scenario, a client waits for more than 30 replies + and died/restart, the server tries to send this 30 replies to this + this client and can wait too 60s per replies. During this + replies for other clients are just stuck. + + This change fixes that. + + Related-bug: #1477914 + Closes-bug: #1521958 + +Author: Mehdi Abaakouk +Origin: backport, https://review.openstack.org/#/c/252361/1 + https://review.openstack.org/#/c/255530/2 +Bug: https://bugs.launchpad.net/oslo.messaging/+bug/1521958 +--- +This patch header follows DEP-3: http://dep.debian.net/deps/dep3/ +--- a/oslo/messaging/_drivers/amqp.py ++++ b/oslo/messaging/_drivers/amqp.py +@@ -340,3 +340,7 @@ + + def get_control_exchange(conf): + return conf.control_exchange ++ ++ ++class AMQPDestinationNotFound(Exception): ++ pass +--- a/oslo/messaging/_drivers/amqpdriver.py ++++ b/oslo/messaging/_drivers/amqpdriver.py +@@ -17,6 +17,7 @@ + + import logging + import threading ++import time + import uuid + + from six import moves +@@ -63,9 +64,36 @@ + conn.direct_send(self.msg_id, rpc_common.serialize_msg(msg)) + + def reply(self, reply=None, failure=None, log_failure=True): +- with self.listener.driver._get_connection() as conn: +- self._send_reply(conn, reply, failure, log_failure=log_failure) +- self._send_reply(conn, ending=True) ++ # NOTE(sileht): we read the configuration value from the driver ++ # to be able to backport this change in previous version that ++ # still have the qpid driver ++ duration = self.listener.driver.missing_destination_retry_timeout ++ timer = rpc_common.DecayingTimer(duration=duration) ++ timer.start() ++ ++ while True: ++ try: ++ with self.listener.driver._get_connection() as conn: ++ self._send_reply(conn, reply, failure, ++ log_failure=log_failure) ++ self._send_reply(conn, ending=True) ++ return ++ except rpc_amqp.AMQPDestinationNotFound: ++ if timer.check_return() > 0: ++ LOG.info("The reply %(msg_id)s cannot be sent " ++ "%(reply_q)s reply queue don't exist, " ++ "retrying..." % { ++ 'msg_id': self.msg_id, ++ 'reply_q': self.reply_q}) ++ time.sleep(0.25) ++ else: ++ LOG.info("** The reply %(msg_id)s cannot be sent " ++ "%(reply_q)s reply queue don't exist after " ++ "%(duration)s sec abandoning..." % { ++ 'msg_id': self.msg_id, ++ 'reply_q': self.reply_q, ++ 'duration': duration}) ++ return + + def acknowledge(self): + self.listener.msg_id_cache.add(self.unique_id) +@@ -296,6 +324,7 @@ + + + class AMQPDriverBase(base.BaseDriver): ++ missing_destination_retry_timeout = 0 + + def __init__(self, conf, url, connection_pool, + default_exchange=None, allowed_remote_exmods=[]): +--- a/oslo/messaging/_drivers/impl_rabbit.py ++++ b/oslo/messaging/_drivers/impl_rabbit.py +@@ -741,6 +741,10 @@ + """Send to a publisher based on the publisher class.""" + + def _error_callback(exc): ++ if exc.code == 404: ++ raise rpc_amqp.AMQPDestinationNotFound( ++ "Exchange doesn't exists" ) ++ + log_info = {'topic': topic, 'err_str': str(exc)} + LOG.exception(_("Failed to publish message to topic " + "'%(topic)s': %(err_str)s") % log_info) +@@ -774,30 +778,8 @@ + def direct_send(self, msg_id, msg): + """Send a 'direct' message.""" + +- timer = rpc_common.DecayingTimer(duration=60) +- timer.start() +- # NOTE(sileht): retry at least 60sec, after we have a good change +- # that the caller is really dead too... +- +- while True: +- try: +- self.publisher_send(DirectPublisher, msg_id, msg) +- except self.connection.channel_errors as exc: +- # NOTE(noelbk/sileht): +- # If rabbit dies, the consumer can be disconnected before the +- # publisher sends, and if the consumer hasn't declared the +- # queue, the publisher's will send a message to an exchange +- # that's not bound to a queue, and the message wll be lost. +- # So we set passive=True to the publisher exchange and catch +- # the 404 kombu ChannelError and retry until the exchange +- # appears +- if exc.code == 404 and timer.check_return() > 0: +- LOG.info(_("The exchange to reply to %s doesn't " +- "exist yet, retrying...") % msg_id) +- time.sleep(1) +- continue +- raise +- return ++ self.publisher_send(DirectPublisher, msg_id, msg) ++ return + + def topic_send(self, topic, msg, timeout=None): + """Send a 'topic' message.""" +@@ -828,6 +810,8 @@ + conf.register_opts(rabbit_opts) + conf.register_opts(rpc_amqp.amqp_opts) + ++ self.missing_destination_retry_timeout = 60 ++ + connection_pool = rpc_amqp.get_connection_pool(conf, Connection) + + super(RabbitDriver, self).__init__(conf, url, diff -Nru oslo.messaging-1.3.0/debian/patches/series oslo.messaging-1.3.0/debian/patches/series --- oslo.messaging-1.3.0/debian/patches/series 2016-01-22 21:52:28.000000000 +0800 +++ oslo.messaging-1.3.0/debian/patches/series 2016-01-25 16:26:53.000000000 +0800 @@ -4,3 +4,4 @@ 0003-Declare-DirectPublisher-exchanges-with-passive-True.patch redeclare-consumers-when-ack-requeue-fails.patch fix-reconnect-race-condition-with-rabbitmq-cluster.patch +fix-rabbit-starvation-of-connections-for-reply.patch