diff -Nru oslo.messaging-1.4.1/debian/changelog oslo.messaging-1.4.1/debian/changelog --- oslo.messaging-1.4.1/debian/changelog 2015-09-17 19:55:37.000000000 +0800 +++ oslo.messaging-1.4.1/debian/changelog 2015-12-25 16:21:58.000000000 +0800 @@ -1,3 +1,11 @@ +oslo.messaging (1.4.1-0ubuntu1.2~cloud0ubuntu1) trusty-juno; urgency=medium + + * Backport of fix to support AMQP HA failover cases. + - d/p/0007-fix-rabbit-starvation-of-connections-for-reply.patch: + To solve nova-conductor infinitely loop issue (LP: #1521958). + + -- Hui Xiang Fri, 25 Dec 2015 16:10:44 +0800 + oslo.messaging (1.4.1-0ubuntu1.2~cloud0) trusty-juno; urgency=medium * Backport of upstream fix for LP: #1338732. diff -Nru oslo.messaging-1.4.1/debian/patches/0007-fix-rabbit-starvation-of-connections-for-reply.patch oslo.messaging-1.4.1/debian/patches/0007-fix-rabbit-starvation-of-connections-for-reply.patch --- oslo.messaging-1.4.1/debian/patches/0007-fix-rabbit-starvation-of-connections-for-reply.patch 1970-01-01 08:00:00.000000000 +0800 +++ oslo.messaging-1.4.1/debian/patches/0007-fix-rabbit-starvation-of-connections-for-reply.patch 2015-12-25 16:10:25.000000000 +0800 @@ -0,0 +1,116 @@ +--- a/oslo/messaging/_drivers/amqp.py ++++ b/oslo/messaging/_drivers/amqp.py +@@ -244,3 +244,7 @@ + unique_id = uuid.uuid4().hex + msg.update({UNIQUE_ID: unique_id}) + LOG.debug('UNIQUE_ID is %s.', unique_id) ++ ++ ++class AMQPDestinationNotFound(Exception): ++ pass +--- a/oslo/messaging/_drivers/amqpdriver.py ++++ b/oslo/messaging/_drivers/amqpdriver.py +@@ -69,9 +69,37 @@ + # NOTE(Alexei_987) not sending reply, if msg_id is empty + # because reply should not be expected by caller side + return +- with self.listener.driver._get_connection() as conn: +- self._send_reply(conn, reply, failure, log_failure=log_failure) +- self._send_reply(conn, ending=True) ++ ++ # NOTE(sileht): we read the configuration value from the driver ++ # to be able to backport this change in previous version that ++ # still have the qpid driver ++ duration = self.listener.driver.missing_destination_retry_timeout ++ timer = rpc_common.DecayingTimer(duration=duration) ++ timer.start() ++ ++ while True: ++ try: ++ with self.listener.driver._get_connection() as conn: ++ self._send_reply(conn, reply, failure, ++ log_failure=log_failure) ++ self._send_reply(conn, ending=True) ++ return ++ except rpc_amqp.AMQPDestinationNotFound: ++ if timer.check_return() > 0: ++ LOG.info("The reply %(msg_id)s cannot be sent " ++ "%(reply_q)s reply queue don't exist, " ++ "retrying..." % { ++ 'msg_id': self.msg_id, ++ 'reply_q': self.reply_q}) ++ time.sleep(0.25) ++ else: ++ LOG.info("** The reply %(msg_id)s cannot be sent " ++ "%(reply_q)s reply queue don't exist after " ++ "%(duration)s sec abandoning..." % { ++ 'msg_id': self.msg_id, ++ 'reply_q': self.reply_q, ++ 'duration': duration}) ++ return + + def acknowledge(self): + self.listener.msg_id_cache.add(self.unique_id) +@@ -315,6 +343,7 @@ + + + class AMQPDriverBase(base.BaseDriver): ++ missing_destination_retry_timeout = 0 + + def __init__(self, conf, url, connection_pool, + default_exchange=None, allowed_remote_exmods=None): +--- a/oslo/messaging/_drivers/impl_rabbit.py ++++ b/oslo/messaging/_drivers/impl_rabbit.py +@@ -764,6 +764,10 @@ + """Send to a publisher based on the publisher class.""" + + def _error_callback(exc): ++ if exc.code == 404: ++ raise rpc_amqp.AMQPDestinationNotFound( ++ "exchange doesn't exists" ) ++ + log_info = {'topic': topic, 'err_str': exc} + LOG.exception(_("Failed to publish message to topic " + "'%(topic)s': %(err_str)s"), log_info) +@@ -797,30 +801,8 @@ + def direct_send(self, msg_id, msg): + """Send a 'direct' message.""" + +- timer = rpc_common.DecayingTimer(duration=60) +- timer.start() +- # NOTE(sileht): retry at least 60sec, after we have a good change +- # that the caller is really dead too... +- +- while True: +- try: +- self.publisher_send(DirectPublisher, msg_id, msg) +- except self.connection.channel_errors as exc: +- # NOTE(noelbk/sileht): +- # If rabbit dies, the consumer can be disconnected before the +- # publisher sends, and if the consumer hasn't declared the +- # queue, the publisher's will send a message to an exchange +- # that's not bound to a queue, and the message wll be lost. +- # So we set passive=True to the publisher exchange and catch +- # the 404 kombu ChannelError and retry until the exchange +- # appears +- if exc.code == 404 and timer.check_return() > 0: +- LOG.info(("The exchange to reply to %s doesn't " +- "exist yet, retrying...") % msg_id) +- time.sleep(1) +- continue +- raise +- return ++ self.publisher_send(DirectPublisher, msg_id, msg) ++ return + + def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None): + """Send a 'topic' message.""" +@@ -854,6 +836,8 @@ + conf.register_opts(rabbit_opts) + conf.register_opts(rpc_amqp.amqp_opts) + ++ self.missing_destination_retry_timeout = 60 ++ + connection_pool = rpc_amqp.get_connection_pool(conf, url, Connection) + + super(RabbitDriver, self).__init__(conf, url, diff -Nru oslo.messaging-1.4.1/debian/patches/series oslo.messaging-1.4.1/debian/patches/series --- oslo.messaging-1.4.1/debian/patches/series 2015-09-17 19:55:06.000000000 +0800 +++ oslo.messaging-1.4.1/debian/patches/series 2015-12-25 15:57:51.000000000 +0800 @@ -5,3 +5,4 @@ 0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch 0005-Fix-possible-usage-of-undefined-variable.patch 0006-Declare-DirectPublisher-exchanges-with-passive-True.patch +0007-fix-rabbit-starvation-of-connections-for-reply.patch