diff -Nru oslo.messaging-1.3.0/debian/changelog oslo.messaging-1.3.0/debian/changelog --- oslo.messaging-1.3.0/debian/changelog 2015-06-25 17:01:10.000000000 +0800 +++ oslo.messaging-1.3.0/debian/changelog 2015-12-25 15:46:31.000000000 +0800 @@ -1,3 +1,11 @@ +oslo.messaging (1.3.0-0ubuntu1.3) trusty; urgency=medium + + * Backport fixes to support AMQP HA failover cases. + - d/p/fix-rabbit-starvation-of-connections-for-reply: + To solve nova-conductor infinitely loop issue (LP: #1521958). + + -- Hui Xiang Fri, 25 Dec 2015 15:45:09 +0800 + oslo.messaging (1.3.0-0ubuntu1.2) trusty; urgency=medium * Detect when underlying kombu connection to rabbitmq server has been diff -Nru oslo.messaging-1.3.0/debian/patches/fix-rabbit-starvation-of-connections-for-reply.patch oslo.messaging-1.3.0/debian/patches/fix-rabbit-starvation-of-connections-for-reply.patch --- oslo.messaging-1.3.0/debian/patches/fix-rabbit-starvation-of-connections-for-reply.patch 1970-01-01 08:00:00.000000000 +0800 +++ oslo.messaging-1.3.0/debian/patches/fix-rabbit-starvation-of-connections-for-reply.patch 2015-12-25 16:23:19.000000000 +0800 @@ -0,0 +1,134 @@ +Index: oslo.messaging-1.3.0/oslo/messaging/_drivers/amqp.py +=================================================================== +--- oslo.messaging-1.3.0.orig/oslo/messaging/_drivers/amqp.py 2015-12-25 15:40:18.097140076 +0800 ++++ oslo.messaging-1.3.0/oslo/messaging/_drivers/amqp.py 2015-12-25 15:40:18.093140076 +0800 +@@ -340,3 +340,7 @@ + + def get_control_exchange(conf): + return conf.control_exchange ++ ++ ++class AMQPDestinationNotFound(Exception): ++ pass +Index: oslo.messaging-1.3.0/oslo/messaging/_drivers/amqpdriver.py +=================================================================== +--- oslo.messaging-1.3.0.orig/oslo/messaging/_drivers/amqpdriver.py 2015-12-25 15:40:18.097140076 +0800 ++++ oslo.messaging-1.3.0/oslo/messaging/_drivers/amqpdriver.py 2015-12-25 15:40:18.093140076 +0800 +@@ -17,6 +17,7 @@ + + import logging + import threading ++import time + import uuid + + from six import moves +@@ -63,9 +64,41 @@ + conn.direct_send(self.msg_id, rpc_common.serialize_msg(msg)) + + def reply(self, reply=None, failure=None, log_failure=True): +- with self.listener.driver._get_connection() as conn: +- self._send_reply(conn, reply, failure, log_failure=log_failure) +- self._send_reply(conn, ending=True) ++ if not self.msg_id: ++ # NOTE(Alexei_987) not sending reply, if msg_id is empty ++ # because reply should not be expected by caller side ++ return ++ ++ # NOTE(sileht): we read the configuration value from the driver ++ # to be able to backport this change in previous version that ++ # still have the qpid driver ++ duration = self.listener.driver.missing_destination_retry_timeout ++ timer = rpc_common.DecayingTimer(duration=duration) ++ timer.start() ++ ++ while True: ++ try: ++ with self.listener.driver._get_connection() as conn: ++ self._send_reply(conn, reply, failure, ++ log_failure=log_failure) ++ self._send_reply(conn, ending=True) ++ return ++ except rpc_amqp.AMQPDestinationNotFound: ++ if timer.check_return() > 0: ++ LOG.info("The reply %(msg_id)s cannot be sent " ++ "%(reply_q)s reply queue don't exist, " ++ "retrying..." % { ++ 'msg_id': self.msg_id, ++ 'reply_q': self.reply_q}) ++ time.sleep(0.25) ++ else: ++ LOG.info("** The reply %(msg_id)s cannot be sent " ++ "%(reply_q)s reply queue don't exist after " ++ "%(duration)s sec abandoning..." % { ++ 'msg_id': self.msg_id, ++ 'reply_q': self.reply_q, ++ 'duration': duration}) ++ return + + def acknowledge(self): + self.listener.msg_id_cache.add(self.unique_id) +@@ -296,6 +329,7 @@ + + + class AMQPDriverBase(base.BaseDriver): ++ missing_destination_retry_timeout = 0 + + def __init__(self, conf, url, connection_pool, + default_exchange=None, allowed_remote_exmods=[]): +Index: oslo.messaging-1.3.0/oslo/messaging/_drivers/impl_rabbit.py +=================================================================== +--- oslo.messaging-1.3.0.orig/oslo/messaging/_drivers/impl_rabbit.py 2015-12-25 15:40:18.097140076 +0800 ++++ oslo.messaging-1.3.0/oslo/messaging/_drivers/impl_rabbit.py 2015-12-25 16:23:11.941254276 +0800 +@@ -728,6 +728,10 @@ + """Send to a publisher based on the publisher class.""" + + def _error_callback(exc): ++ if exc.code == 404: ++ raise rpc_amqp.AMQPDestinationNotFound( ++ "exchange doesn't exists" ) ++ + log_info = {'topic': topic, 'err_str': str(exc)} + LOG.exception(_("Failed to publish message to topic " + "'%(topic)s': %(err_str)s") % log_info) +@@ -761,30 +765,8 @@ + def direct_send(self, msg_id, msg): + """Send a 'direct' message.""" + +- timer = rpc_common.DecayingTimer(duration=60) +- timer.start() +- # NOTE(sileht): retry at least 60sec, after we have a good change +- # that the caller is really dead too... +- +- while True: +- try: +- self.publisher_send(DirectPublisher, msg_id, msg) +- except self.connection.channel_errors as exc: +- # NOTE(noelbk/sileht): +- # If rabbit dies, the consumer can be disconnected before the +- # publisher sends, and if the consumer hasn't declared the +- # queue, the publisher's will send a message to an exchange +- # that's not bound to a queue, and the message wll be lost. +- # So we set passive=True to the publisher exchange and catch +- # the 404 kombu ChannelError and retry until the exchange +- # appears +- if exc.code == 404 and timer.check_return() > 0: +- LOG.info(_("The exchange to reply to %s doesn't " +- "exist yet, retrying...") % msg_id) +- time.sleep(1) +- continue +- raise +- return ++ self.publisher_send(DirectPublisher, msg_id, msg) ++ return + + def topic_send(self, topic, msg, timeout=None): + """Send a 'topic' message.""" +@@ -815,6 +797,8 @@ + conf.register_opts(rabbit_opts) + conf.register_opts(rpc_amqp.amqp_opts) + ++ self.missing_destination_retry_timeout = 60 ++ + connection_pool = rpc_amqp.get_connection_pool(conf, Connection) + + super(RabbitDriver, self).__init__(conf, url, diff -Nru oslo.messaging-1.3.0/debian/patches/series oslo.messaging-1.3.0/debian/patches/series --- oslo.messaging-1.3.0/debian/patches/series 2015-06-25 16:59:40.000000000 +0800 +++ oslo.messaging-1.3.0/debian/patches/series 2015-12-25 15:40:00.000000000 +0800 @@ -3,3 +3,4 @@ 0002-rabbit-fix-timeout-timer-when-duration-is-None.patch 0003-Declare-DirectPublisher-exchanges-with-passive-True.patch redeclare-consumers-when-ack-requeue-fails.patch +fix-rabbit-starvation-of-connections-for-reply.patch