diff -Nru oslo.messaging-1.4.1/debian/changelog oslo.messaging-1.4.1/debian/changelog --- oslo.messaging-1.4.1/debian/changelog 2014-09-26 09:29:59.000000000 -0700 +++ oslo.messaging-1.4.1/debian/changelog 2015-06-25 16:22:46.000000000 -0700 @@ -1,3 +1,23 @@ +oslo.messaging (1.4.1-0ubuntu2) utopic; urgency=medium + + * Backport fixes for reliable AMQP reconnect support, ensuring + nova-compute instances re-connect and message correctly when + RabbitMQ message brokers disappear is clustered configurations: + - d/p/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch: + Improve precision of iterconsume timeouts (LP: #1400268). + - d/p/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch: + Fix timeout timer when duration is set to None (LP: #1408370). + - d/p/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch: + Ensure that message publishers fail and retry if the consumer has + not yet declared a receiving queue (LP: #1338732). + - d/p/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch: + Redeclare consumers when the ack/requeue fails (LP: #1448650) + - d/p/0005-Fix-possible-usage-of-undefined-variable.patch: + Fix use of a variable before its definition in the reconnect + path when an exception is encountered (LP: #1465314) + + -- Billy Olsen Thu, 25 Jun 2015 16:22:19 -0700 + oslo.messaging (1.4.1-0ubuntu1) utopic; urgency=medium * New upstream final release for OpenStack Juno: diff -Nru oslo.messaging-1.4.1/debian/patches/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch oslo.messaging-1.4.1/debian/patches/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch --- oslo.messaging-1.4.1/debian/patches/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch 1969-12-31 17:00:00.000000000 -0700 +++ oslo.messaging-1.4.1/debian/patches/0001-Fix-_poll_connection-not-timeout-issue-1-2.patch 2015-06-25 16:19:10.000000000 -0700 @@ -0,0 +1,197 @@ +From 273d9d94df46c00f9712dbdb2d683ab19ee26b52 Mon Sep 17 00:00:00 2001 +From: Jie Li +Date: Wed, 4 Mar 2015 15:42:13 +0800 +Subject: [PATCH 1/4] Fix _poll_connection not timeout issue (1/2) + +_poll_connection could fall into a loop waiting for a reply message, if +rabbit dies and up. This commit will set up a rpc_response_timeout timer +for one connection polling; so the rpc will finally jump out with a +timeout exception which is expected in such scenario. + +Related bug: #1338732 + +The title of the commit in master was: + rabbit: more precise iterconsume timeout + +but this was changed since it didn't describe the actual change. +This commit resolved some conflicts due to cherry-pick. + +Change-Id: I157dab80cdb4afcf9a5f26fa900f96f0696db502 +(cherry picked from commit 023b7f44e2ccd77a7e9ee9ee78431a4646c88f13) +(cherry picked from commit 858353394dd15101ebcb1c4b39e672bc4aa2bd86) +--- + oslo/messaging/_drivers/amqpdriver.py | 24 ++++++++++++++++-------- + oslo/messaging/_drivers/common.py | 25 +++++++++++++++++++++++++ + oslo/messaging/_drivers/impl_rabbit.py | 27 +++++++++++++++++++-------- + 3 files changed, 60 insertions(+), 16 deletions(-) + +diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py +index f9f1b06..2760a8a 100644 +--- a/oslo/messaging/_drivers/amqpdriver.py ++++ b/oslo/messaging/_drivers/amqpdriver.py +@@ -26,6 +26,7 @@ from oslo import messaging + from oslo.messaging._drivers import amqp as rpc_amqp + from oslo.messaging._drivers import base + from oslo.messaging._drivers import common as rpc_common ++from oslo.messaging.openstack.common.gettextutils import _ + + LOG = logging.getLogger(__name__) + +@@ -202,6 +203,11 @@ class ReplyWaiter(object): + def unlisten(self, msg_id): + self.waiters.remove(msg_id) + ++ @staticmethod ++ def _raise_timeout_exception(msg_id): ++ raise messaging.MessagingTimeout( ++ _('Timed out waiting for a reply to message ID %s.') % msg_id) ++ + def _process_reply(self, data): + result = None + ending = False +@@ -216,7 +222,7 @@ class ReplyWaiter(object): + result = data['result'] + return result, ending + +- def _poll_connection(self, msg_id, timeout): ++ def _poll_connection(self, msg_id, timer): + while True: + while self.incoming: + message_data = self.incoming.pop(0) +@@ -227,15 +233,15 @@ class ReplyWaiter(object): + + self.waiters.put(incoming_msg_id, message_data) + ++ timeout = timer.check_return(self._raise_timeout_exception, msg_id) + try: + self.conn.consume(limit=1, timeout=timeout) + except rpc_common.Timeout: +- raise messaging.MessagingTimeout('Timed out waiting for a ' +- 'reply to message ID %s' +- % msg_id) ++ self._raise_timeout_exception(msg_id) + +- def _poll_queue(self, msg_id, timeout): +- message = self.waiters.get(msg_id, timeout) ++ def _poll_queue(self, msg_id, timer): ++ timeout = timer.check_return(self._raise_timeout_exception, msg_id) ++ message = self.waiters.get(msg_id, timeout=timeout) + if message is self.waiters.WAKE_UP: + return None, None, True # lock was released + +@@ -264,6 +270,8 @@ class ReplyWaiter(object): + # have the first thread take responsibility for passing replies not + # intended for itself to the appropriate thread. + # ++ timer = rpc_common.DecayingTimer(duration=timeout) ++ timer.start() + final_reply = None + while True: + if self.conn_lock.acquire(False): +@@ -282,7 +290,7 @@ class ReplyWaiter(object): + + # Now actually poll the connection + while True: +- reply, ending = self._poll_connection(msg_id, timeout) ++ reply, ending = self._poll_connection(msg_id, timer) + if not ending: + final_reply = reply + else: +@@ -295,7 +303,7 @@ class ReplyWaiter(object): + self.waiters.wake_all(msg_id) + else: + # We're going to wait for the first thread to pass us our reply +- reply, ending, trylock = self._poll_queue(msg_id, timeout) ++ reply, ending, trylock = self._poll_queue(msg_id, timer) + if trylock: + # The first thread got its reply, let's try and take over + # the responsibility for polling +diff --git a/oslo/messaging/_drivers/common.py b/oslo/messaging/_drivers/common.py +index 71ca02b..24fee5e 100644 +--- a/oslo/messaging/_drivers/common.py ++++ b/oslo/messaging/_drivers/common.py +@@ -18,6 +18,7 @@ + import copy + import logging + import sys ++import time + import traceback + + import six +@@ -347,3 +348,27 @@ def deserialize_msg(msg): + raw_msg = jsonutils.loads(msg[_MESSAGE_KEY]) + + return raw_msg ++ ++ ++class DecayingTimer(object): ++ def __init__(self, duration=None): ++ self._duration = duration ++ self._ends_at = None ++ ++ def start(self): ++ if self._duration is not None: ++ self._ends_at = time.time() + max(0, self._duration) ++ ++ def check_return(self, timeout_callback, *args, **kwargs): ++ if self._duration is None: ++ return None ++ if self._ends_at is None: ++ raise RuntimeError(_("Can not check/return a timeout from a timer" ++ " that has not been started.")) ++ ++ maximum = kwargs.pop('maximum', None) ++ left = self._ends_at - time.time() ++ if left <= 0: ++ timeout_callback(*args, **kwargs) ++ ++ return left if maximum is None else min(left, maximum) +diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py +index bdb6693..c3197ac 100644 +--- a/oslo/messaging/_drivers/impl_rabbit.py ++++ b/oslo/messaging/_drivers/impl_rabbit.py +@@ -715,14 +715,18 @@ class Connection(object): + def iterconsume(self, limit=None, timeout=None): + """Return an iterator that will consume from all queues/consumers.""" + ++ timer = rpc_common.DecayingTimer(duration=timeout) ++ timer.start() ++ ++ def _raise_timeout(exc): ++ LOG.debug('Timed out waiting for RPC response: %s', exc) ++ raise rpc_common.Timeout() ++ + def _error_callback(exc): +- if isinstance(exc, socket.timeout): +- LOG.debug('Timed out waiting for RPC response: %s', exc) +- raise rpc_common.Timeout() +- else: +- LOG.exception(_('Failed to consume message from queue: %s'), +- exc) +- self.do_consume = True ++ timer.check_return(_raise_timeout, exc) ++ LOG.exception(_('Failed to consume message from queue: %s'), ++ exc) ++ self.do_consume = True + + def _consume(): + if self.do_consume: +@@ -732,7 +736,14 @@ class Connection(object): + queue.consume(nowait=True) + queues_tail.consume(nowait=False) + self.do_consume = False +- return self.connection.drain_events(timeout=timeout) ++ ++ poll_timeout = 1 if timeout is None else min(timeout, 1) ++ while True: ++ try: ++ return self.connection.drain_events(timeout=poll_timeout) ++ except socket.timeout as exc: ++ poll_timeout = timer.check_return(_raise_timeout, exc, ++ maximum=1) + + for iteration in itertools.count(0): + if limit and iteration >= limit: +-- +2.1.4 + diff -Nru oslo.messaging-1.4.1/debian/patches/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch oslo.messaging-1.4.1/debian/patches/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch --- oslo.messaging-1.4.1/debian/patches/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch 1969-12-31 17:00:00.000000000 -0700 +++ oslo.messaging-1.4.1/debian/patches/0002-Fix-_poll_connection-not-timeout-issue-2-2.patch 2015-06-25 16:21:39.000000000 -0700 @@ -0,0 +1,73 @@ +From 67e1445efae5431554d10dea00c0425f55f1d4cb Mon Sep 17 00:00:00 2001 +From: Jie Li +Date: Wed, 4 Mar 2015 15:47:59 +0800 +Subject: [PATCH 2/4] Fix _poll_connection not timeout issue (2/2) + +Backport bugfix about the last cherry-pick. + +The title of the commit in master was: + rabbit: fix timeout timer when duration is None + +but this was changed since it didn't describe the actual change. + +Change-Id: I7f4cb3075f776c63aa7dc497173677f92b68c16d +(cherry picked from commit 44132d4344902f98007e6e58ea3bee56c701b400) +(cherry picked from commit 2bb31d179345802e50fb0ed0042e296a39b41585) +--- + oslo/messaging/_drivers/common.py | 4 ++-- + tests/test_utils.py | 15 +++++++++++++++ + 2 files changed, 17 insertions(+), 2 deletions(-) + +diff --git a/oslo/messaging/_drivers/common.py b/oslo/messaging/_drivers/common.py +index 24fee5e..a8137bd 100644 +--- a/oslo/messaging/_drivers/common.py ++++ b/oslo/messaging/_drivers/common.py +@@ -360,13 +360,13 @@ class DecayingTimer(object): + self._ends_at = time.time() + max(0, self._duration) + + def check_return(self, timeout_callback, *args, **kwargs): ++ maximum = kwargs.pop('maximum', None) + if self._duration is None: +- return None ++ return None if maximum is None else maximum + if self._ends_at is None: + raise RuntimeError(_("Can not check/return a timeout from a timer" + " that has not been started.")) + +- maximum = kwargs.pop('maximum', None) + left = self._ends_at - time.time() + if left <= 0: + timeout_callback(*args, **kwargs) +diff --git a/tests/test_utils.py b/tests/test_utils.py +index 1321009..38ee1d0 100644 +--- a/tests/test_utils.py ++++ b/tests/test_utils.py +@@ -13,6 +13,7 @@ + # License for the specific language governing permissions and limitations + # under the License. + ++from oslo.messaging._drivers import common + from oslo.messaging import _utils as utils + from tests import utils as test_utils + +@@ -47,3 +48,17 @@ class VersionIsCompatibleTestCase(test_utils.BaseTestCase): + + def test_version_is_compatible_no_rev_is_zero(self): + self.assertTrue(utils.version_is_compatible('1.23.0', '1.23')) ++ ++ ++class TimerTestCase(test_utils.BaseTestCase): ++ def test_duration_is_none(self): ++ t = common.DecayingTimer() ++ t.start() ++ remaining = t.check_return(None) ++ self.assertEqual(None, remaining) ++ ++ def test_duration_is_none_and_maximun_set(self): ++ t = common.DecayingTimer() ++ t.start() ++ remaining = t.check_return(None, maximum=2) ++ self.assertEqual(2, remaining) +-- +2.1.4 + diff -Nru oslo.messaging-1.4.1/debian/patches/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch oslo.messaging-1.4.1/debian/patches/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch --- oslo.messaging-1.4.1/debian/patches/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch 1969-12-31 17:00:00.000000000 -0700 +++ oslo.messaging-1.4.1/debian/patches/0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch 2015-06-25 16:21:46.000000000 -0700 @@ -0,0 +1,34 @@ +From ba58d0ba5e059433009993c5035273e6389a01af Mon Sep 17 00:00:00 2001 +From: Mehdi Abaakouk +Date: Fri, 26 Sep 2014 22:28:17 +0200 +Subject: [PATCH 3/4] qpid: Always auto-delete queue of DirectConsumer + +In case of the call()er die, the queue used to reply must be +automatically deleted, otherwise they will never been deleted. + +This is done in the same way as the rabbit drivers does, +by enabling auto-delete on this queue. + +Change-Id: Ib01981d704b8849e0115791ff90fbb74cbac421f +Closes-Bug: #1374519 +(cherry picked from commit 1640cc17031bde765d7e272c2e65993c4b270a45) +(cherry picked from commit c50cf0b58ff6b0129a7c4cda72e1b32e5303ea03) +--- + oslo/messaging/_drivers/impl_qpid.py | 1 - + 1 file changed, 1 deletion(-) + +diff --git a/oslo/messaging/_drivers/impl_qpid.py b/oslo/messaging/_drivers/impl_qpid.py +index f8984da..e99d0ab 100644 +--- a/oslo/messaging/_drivers/impl_qpid.py ++++ b/oslo/messaging/_drivers/impl_qpid.py +@@ -227,7 +227,6 @@ class DirectConsumer(ConsumerBase): + """ + + link_opts = { +- "auto-delete": conf.amqp_auto_delete, + "exclusive": True, + "durable": conf.amqp_durable_queues, + } +-- +2.1.4 + diff -Nru oslo.messaging-1.4.1/debian/patches/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch oslo.messaging-1.4.1/debian/patches/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch --- oslo.messaging-1.4.1/debian/patches/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch 1969-12-31 17:00:00.000000000 -0700 +++ oslo.messaging-1.4.1/debian/patches/0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch 2015-06-25 16:21:53.000000000 -0700 @@ -0,0 +1,86 @@ +From b78cc626194a0ccf25f82446e43ca6979149cb1d Mon Sep 17 00:00:00 2001 +From: Mehdi Abaakouk +Date: Tue, 5 May 2015 10:29:22 +0200 +Subject: [PATCH 4/4] rabbit: redeclare consumers when ack/requeue fail + +In case the acknowledgement or requeue of a message fail, +the kombu transport can be disconnected + +In this case, we must redeclare our consumers. + +This changes fixes that. + +This have no tests because the kombu memory transport we use in our tests +cannot be in disconnected state. + +Closes-bug: #1448650 + +Change-Id: I5991a4cf827411bc27c857561d97461212a17f40 +(cherry picked from commit 415db68b67368d7c8aa550e7108122200816e665) + +Conflicts are due to the refactoring to oslo_messaging namespace. + +Conflicts: + oslo_messaging/_drivers/impl_rabbit.py + oslo_messaging/tests/drivers/test_impl_rabbit.py + +(cherry picked from commit 1fae6269bdfdab0360b91b9647d5a1f43c263e12) +(cherry picked from commit c30e2fbed4c6c558d575ad15ceaf90871623bd5b) +--- + oslo/messaging/_drivers/impl_rabbit.py | 7 +++++++ + tests/drivers/test_impl_rabbit.py | 21 +++++++++++++++++++++ + 2 files changed, 28 insertions(+) + +diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py +index c3197ac..3a419d3 100644 +--- a/oslo/messaging/_drivers/impl_rabbit.py ++++ b/oslo/messaging/_drivers/impl_rabbit.py +@@ -729,6 +729,13 @@ class Connection(object): + self.do_consume = True + + def _consume(): ++ # NOTE(sileht): in case the acknowledgement or requeue of a ++ # message fail, the kombu transport can be disconnected ++ # In this case, we must redeclare our consumers, so raise ++ # a recoverable error to trigger the reconnection code. ++ if not self.connection.connected: ++ raise self.connection.recoverable_connection_errors[0] ++ + if self.do_consume: + queues_head = self.consumers[:-1] # not fanout. + queues_tail = self.consumers[-1] # fanout +diff --git a/tests/drivers/test_impl_rabbit.py b/tests/drivers/test_impl_rabbit.py +index 198252c..c53088b 100644 +--- a/tests/drivers/test_impl_rabbit.py ++++ b/tests/drivers/test_impl_rabbit.py +@@ -45,6 +45,27 @@ class TestRabbitDriverLoad(test_utils.BaseTestCase): + self.assertIsInstance(transport._driver, rabbit_driver.RabbitDriver) + + ++class TestRabbitConsume(test_utils.BaseTestCase): ++ ++ def setUp(self): ++ super(TestRabbitConsume, self).setUp() ++ self.messaging_conf.transport_driver = 'rabbit' ++ self.messaging_conf.in_memory = True ++ ++ def test_connection_ack_have_disconnected_kombu_connection(self): ++ transport = messaging.get_transport(self.conf) ++ self.addCleanup(transport.cleanup) ++ conn = transport._driver._get_connection().connection ++ channel = conn.channel ++ with mock.patch('kombu.connection.Connection.connected', ++ new_callable=mock.PropertyMock, ++ return_value=False): ++ self.assertRaises(driver_common.Timeout, ++ conn.consume, timeout=0.01) ++ # Ensure a new channel have been setuped ++ self.assertNotEqual(channel, conn.channel) ++ ++ + class TestRabbitTransportURL(test_utils.BaseTestCase): + + scenarios = [ +-- +2.1.4 + diff -Nru oslo.messaging-1.4.1/debian/patches/0005-Fix-possible-usage-of-undefined-variable.patch oslo.messaging-1.4.1/debian/patches/0005-Fix-possible-usage-of-undefined-variable.patch --- oslo.messaging-1.4.1/debian/patches/0005-Fix-possible-usage-of-undefined-variable.patch 1969-12-31 17:00:00.000000000 -0700 +++ oslo.messaging-1.4.1/debian/patches/0005-Fix-possible-usage-of-undefined-variable.patch 2015-06-25 16:22:02.000000000 -0700 @@ -0,0 +1,35 @@ +From 1e25b50c15fd3affddb6ca5c243b1bbedbdf2046 Mon Sep 17 00:00:00 2001 +From: "Denis V. Meltsaykin" +Date: Mon, 15 Jun 2015 17:33:36 +0300 +Subject: [PATCH 5/5] Fix possible usage of undefined variable + +In reconnect() the variable is defined after it's being used in +if-section. This could lead to undefined behavior while getting +timeout exception. + +Closes-Bug: #1465314 + +Change-Id: I97720f793728af2158152b7aafcc0ca9a45a6b07 +(cherry picked from commit 3ac14ae54881d47a96cb506ca3fcad39f245d42c) +--- + oslo/messaging/_drivers/impl_rabbit.py | 2 +- + 1 file changed, 1 insertion(+), 1 deletion(-) + +diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py +index 3a419d3..32a19e7 100644 +--- a/oslo/messaging/_drivers/impl_rabbit.py ++++ b/oslo/messaging/_drivers/impl_rabbit.py +@@ -615,9 +615,9 @@ class Connection(object): + # a protocol response. (See paste link in LP888621) + # So, we check all exceptions for 'timeout' in them + # and try to reconnect in this case. ++ e = ex + if 'timeout' not in six.text_type(e): + raise +- e = ex + + log_info = {} + log_info['err_str'] = e +-- +2.1.4 + diff -Nru oslo.messaging-1.4.1/debian/patches/series oslo.messaging-1.4.1/debian/patches/series --- oslo.messaging-1.4.1/debian/patches/series 2014-09-26 09:23:33.000000000 -0700 +++ oslo.messaging-1.4.1/debian/patches/series 2015-06-25 16:22:02.000000000 -0700 @@ -1 +1,6 @@ zmq-server-routing.patch +0001-Fix-_poll_connection-not-timeout-issue-1-2.patch +0002-Fix-_poll_connection-not-timeout-issue-2-2.patch +0003-qpid-Always-auto-delete-queue-of-DirectConsumer.patch +0004-rabbit-redeclare-consumers-when-ack-requeue-fail.patch +0005-Fix-possible-usage-of-undefined-variable.patch