diff -Nru oslo.messaging-1.3.0/debian/changelog oslo.messaging-1.3.0/debian/changelog --- oslo.messaging-1.3.0/debian/changelog 2014-04-01 15:29:27.000000000 +0100 +++ oslo.messaging-1.3.0/debian/changelog 2015-04-16 17:44:44.000000000 +0100 @@ -1,3 +1,13 @@ +oslo.messaging (1.3.0-0ubuntu1hf1338732v20150415.1) trusty; urgency=medium + + [ hopem ] + * Backport fixes for AMQP reconnect support + * 0001-rabbit-more-precise-iterconsume-timeout.patch (LP: #1400268) + * 0002-rabbit-fix-timeout-timer-when-duration-is-None.patch (LP: #1408370) + * 0003-Declare-DirectPublisher-exchanges-with-passive-True.patch (LP: #1338732) + + -- Edward Hope-Morley Thu, 16 Apr 2015 17:39:20 +0100 + oslo.messaging (1.3.0-0ubuntu1) trusty; urgency=medium * New upstream release. (LP: #1300804) diff -Nru oslo.messaging-1.3.0/debian/patches/0001-rabbit-more-precise-iterconsume-timeout.patch oslo.messaging-1.3.0/debian/patches/0001-rabbit-more-precise-iterconsume-timeout.patch --- oslo.messaging-1.3.0/debian/patches/0001-rabbit-more-precise-iterconsume-timeout.patch 1970-01-01 01:00:00.000000000 +0100 +++ oslo.messaging-1.3.0/debian/patches/0001-rabbit-more-precise-iterconsume-timeout.patch 2015-04-16 17:36:29.000000000 +0100 @@ -0,0 +1,191 @@ +From b58180210145e1c804ec496576d6bb2caabc68ef Mon Sep 17 00:00:00 2001 +From: Mehdi Abaakouk +Date: Mon, 8 Dec 2014 10:56:52 +0100 +Subject: [PATCH 1/3] rabbit: more precise iterconsume timeout + +The iterconsume always set the timeout of kombu to 1 second +even the requested timeout more precise or < 1 second. + +This change fixes that. + +Related bug: #1400268 +Related bug: #1399257 +Related-bug: #1338732 + +(cherry picked from commit 023b7f44e2ccd77a7e9ee9ee78431a4646c88f13) + +Conflicts: + oslo/messaging/_drivers/amqpdriver.py + oslo/messaging/_drivers/impl_rabbit.py + +Change-Id: I157dab80cdb4afcf9a5f26fa900f96f0696db502 +--- + oslo/messaging/_drivers/amqpdriver.py | 23 +++++++++++++++-------- + oslo/messaging/_drivers/common.py | 26 ++++++++++++++++++++++++++ + oslo/messaging/_drivers/impl_rabbit.py | 27 ++++++++++++++++++--------- + 3 files changed, 59 insertions(+), 17 deletions(-) + +diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py +index 3a2303d..6d395b7 100644 +--- a/oslo/messaging/_drivers/amqpdriver.py ++++ b/oslo/messaging/_drivers/amqpdriver.py +@@ -184,6 +184,11 @@ class ReplyWaiter(object): + def unlisten(self, msg_id): + self.waiters.remove(msg_id) + ++ @staticmethod ++ def _raise_timeout_exception(msg_id): ++ raise messaging.MessagingTimeout( ++ 'Timed out waiting for a reply to message ID %s' % msg_id) ++ + def _process_reply(self, data): + result = None + ending = False +@@ -198,7 +203,7 @@ class ReplyWaiter(object): + result = data['result'] + return result, ending + +- def _poll_connection(self, msg_id, timeout): ++ def _poll_connection(self, msg_id, timer): + while True: + while self.incoming: + message_data = self.incoming.pop(0) +@@ -209,15 +214,15 @@ class ReplyWaiter(object): + + self.waiters.put(incoming_msg_id, message_data) + ++ timeout = timer.check_return(self._raise_timeout_exception, msg_id) + try: + self.conn.consume(limit=1, timeout=timeout) + except rpc_common.Timeout: +- raise messaging.MessagingTimeout('Timed out waiting for a ' +- 'reply to message ID %s' +- % msg_id) ++ self._raise_timeout_exception(msg_id) + +- def _poll_queue(self, msg_id, timeout): +- message = self.waiters.get(msg_id, timeout) ++ def _poll_queue(self, msg_id, timer): ++ timeout = timer.check_return(self._raise_timeout_exception, msg_id) ++ message = self.waiters.get(msg_id, timeout=timeout) + if message is self.waiters.WAKE_UP: + return None, None, True # lock was released + +@@ -246,6 +251,8 @@ class ReplyWaiter(object): + # have the first thread take responsibility for passing replies not + # intended for itself to the appropriate thread. + # ++ timer = rpc_common.DecayingTimer(duration=timeout) ++ timer.start() + final_reply = None + while True: + if self.conn_lock.acquire(False): +@@ -264,7 +271,7 @@ class ReplyWaiter(object): + + # Now actually poll the connection + while True: +- reply, ending = self._poll_connection(msg_id, timeout) ++ reply, ending = self._poll_connection(msg_id, timer) + if not ending: + final_reply = reply + else: +@@ -277,7 +284,7 @@ class ReplyWaiter(object): + self.waiters.wake_all(msg_id) + else: + # We're going to wait for the first thread to pass us our reply +- reply, ending, trylock = self._poll_queue(msg_id, timeout) ++ reply, ending, trylock = self._poll_queue(msg_id, timer) + if trylock: + # The first thread got its reply, let's try and take over + # the responsibility for polling +diff --git a/oslo/messaging/_drivers/common.py b/oslo/messaging/_drivers/common.py +index e4c18e5..4e314b9 100644 +--- a/oslo/messaging/_drivers/common.py ++++ b/oslo/messaging/_drivers/common.py +@@ -18,6 +18,7 @@ + import copy + import logging + import sys ++import time + import traceback + + from oslo.config import cfg +@@ -507,3 +508,28 @@ def deserialize_msg(msg): + raw_msg = jsonutils.loads(msg[_MESSAGE_KEY]) + + return raw_msg ++ ++ ++class DecayingTimer(object): ++ def __init__(self, duration=None): ++ self._duration = duration ++ self._ends_at = None ++ ++ def start(self): ++ if self._duration is not None: ++ self._ends_at = time.time() + max(0, self._duration) ++ return self ++ ++ def check_return(self, timeout_callback, *args, **kwargs): ++ if self._duration is None: ++ return None ++ if self._ends_at is None: ++ raise RuntimeError("Can not check/return a timeout from a timer" ++ " that has not been started") ++ ++ maximum = kwargs.pop('maximum', None) ++ left = self._ends_at - time.time() ++ if left <= 0: ++ timeout_callback(*args, **kwargs) ++ ++ return left if maximum is None else min(left, maximum) +diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py +index 9df6a17..0114951 100644 +--- a/oslo/messaging/_drivers/impl_rabbit.py ++++ b/oslo/messaging/_drivers/impl_rabbit.py +@@ -689,16 +689,18 @@ class Connection(object): + + def iterconsume(self, limit=None, timeout=None): + """Return an iterator that will consume from all queues/consumers.""" ++ timer = rpc_common.DecayingTimer(duration=timeout) ++ timer.start() ++ ++ def _raise_timeout(exc): ++ LOG.debug('Timed out waiting for RPC response: %s', exc) ++ raise rpc_common.Timeout() + + def _error_callback(exc): +- if isinstance(exc, socket.timeout): +- LOG.debug(_('Timed out waiting for RPC response: %s') % +- str(exc)) +- raise rpc_common.Timeout() +- else: +- LOG.exception(_('Failed to consume message from queue: %s') % +- str(exc)) +- self.do_consume = True ++ self.do_consume = True ++ timer.check_return(_raise_timeout, exc) ++ LOG.exception(_('Failed to consume message from queue: %s'), ++ exc) + + def _consume(): + if self.do_consume: +@@ -708,7 +710,14 @@ class Connection(object): + queue.consume(nowait=True) + queues_tail.consume(nowait=False) + self.do_consume = False +- return self.connection.drain_events(timeout=timeout) ++ ++ poll_timeout = 1 if timeout is None else min(timeout, 1) ++ while True: ++ try: ++ return self.connection.drain_events(timeout=poll_timeout) ++ except socket.timeout as exc: ++ poll_timeout = timer.check_return(_raise_timeout, exc, ++ maximum=1) + + for iteration in itertools.count(0): + if limit and iteration >= limit: +-- +1.9.1 + diff -Nru oslo.messaging-1.3.0/debian/patches/0002-rabbit-fix-timeout-timer-when-duration-is-None.patch oslo.messaging-1.3.0/debian/patches/0002-rabbit-fix-timeout-timer-when-duration-is-None.patch --- oslo.messaging-1.3.0/debian/patches/0002-rabbit-fix-timeout-timer-when-duration-is-None.patch 1970-01-01 01:00:00.000000000 +0100 +++ oslo.messaging-1.3.0/debian/patches/0002-rabbit-fix-timeout-timer-when-duration-is-None.patch 2015-04-16 17:36:52.000000000 +0100 @@ -0,0 +1,73 @@ +From 106dd46101f536359df984394b5994f911df4912 Mon Sep 17 00:00:00 2001 +From: Mehdi Abaakouk +Date: Wed, 7 Jan 2015 15:49:54 +0100 +Subject: [PATCH 2/3] rabbit: fix timeout timer when duration is None + +When the duration of the timeout timer used in the rabbit driver.is +None and we want that the timer return a maximum of N secs it return None +(infinite) instead of N. + +This change fixes that. + +Closes-bug: #1408370 + +(cherry picked from commit 44132d4344902f98007e6e58ea3bee56c701b400) +Change-Id: I7f4cb3075f776c63aa7dc497173677f92b68c16d +--- + oslo/messaging/_drivers/common.py | 4 ++-- + tests/test_utils.py | 15 +++++++++++++++ + 2 files changed, 17 insertions(+), 2 deletions(-) + +diff --git a/oslo/messaging/_drivers/common.py b/oslo/messaging/_drivers/common.py +index 4e314b9..49fd712 100644 +--- a/oslo/messaging/_drivers/common.py ++++ b/oslo/messaging/_drivers/common.py +@@ -521,13 +521,13 @@ class DecayingTimer(object): + return self + + def check_return(self, timeout_callback, *args, **kwargs): ++ maximum = kwargs.pop('maximum', None) + if self._duration is None: +- return None ++ return None if maximum is None else maximum + if self._ends_at is None: + raise RuntimeError("Can not check/return a timeout from a timer" + " that has not been started") + +- maximum = kwargs.pop('maximum', None) + left = self._ends_at - time.time() + if left <= 0: + timeout_callback(*args, **kwargs) +diff --git a/tests/test_utils.py b/tests/test_utils.py +index 1321009..38ee1d0 100644 +--- a/tests/test_utils.py ++++ b/tests/test_utils.py +@@ -13,6 +13,7 @@ + # License for the specific language governing permissions and limitations + # under the License. + ++from oslo.messaging._drivers import common + from oslo.messaging import _utils as utils + from tests import utils as test_utils + +@@ -47,3 +48,17 @@ class VersionIsCompatibleTestCase(test_utils.BaseTestCase): + + def test_version_is_compatible_no_rev_is_zero(self): + self.assertTrue(utils.version_is_compatible('1.23.0', '1.23')) ++ ++ ++class TimerTestCase(test_utils.BaseTestCase): ++ def test_duration_is_none(self): ++ t = common.DecayingTimer() ++ t.start() ++ remaining = t.check_return(None) ++ self.assertEqual(None, remaining) ++ ++ def test_duration_is_none_and_maximun_set(self): ++ t = common.DecayingTimer() ++ t.start() ++ remaining = t.check_return(None, maximum=2) ++ self.assertEqual(2, remaining) +-- +1.9.1 + diff -Nru oslo.messaging-1.3.0/debian/patches/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch oslo.messaging-1.3.0/debian/patches/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch --- oslo.messaging-1.3.0/debian/patches/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch 1970-01-01 01:00:00.000000000 +0100 +++ oslo.messaging-1.3.0/debian/patches/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch 2015-04-16 17:37:10.000000000 +0100 @@ -0,0 +1,190 @@ +From 33f3b18539da88ddf984ada4b63e2767d64438f9 Mon Sep 17 00:00:00 2001 +From: Mehdi Abaakouk +Date: Tue, 22 Jul 2014 09:42:52 -0700 +Subject: [PATCH 3/3] Declare DirectPublisher exchanges with passive=True + +If rabbit dies, the consumer can be disconnected before the publisher +sends, and if the consumer hasn't declared the queue, the publisher's +will send a message to an exchange that's not bound to a queue, and +the message wll be lost. Setting passive=True will cause the +publisher to fail and retry if the consumer hasn't declared the +receiving queue yet. + +Co-Authored-By: Noel Burton-Krahn +Closes-Bug: #1338732 +(cherry picked from commit 434b5c8781b36cd65b7b642d723c0502e7093795) + +Conflicts: + oslo_messaging/_drivers/common.py + oslo_messaging/tests/test_utils.py + +Change-Id: I5ba4d311b97236d3a85a9f5badff61f12b08c12d +--- + oslo/messaging/_drivers/common.py | 4 ++-- + oslo/messaging/_drivers/impl_rabbit.py | 30 ++++++++++++++++++++++++++++-- + tests/test_rabbit.py | 23 +++++++++++++++++++++-- + tests/test_utils.py | 26 ++++++++++++++++++++++---- + 4 files changed, 73 insertions(+), 10 deletions(-) + +diff --git a/oslo/messaging/_drivers/common.py b/oslo/messaging/_drivers/common.py +index 49fd712..6d5f1fb 100644 +--- a/oslo/messaging/_drivers/common.py ++++ b/oslo/messaging/_drivers/common.py +@@ -520,7 +520,7 @@ class DecayingTimer(object): + self._ends_at = time.time() + max(0, self._duration) + return self + +- def check_return(self, timeout_callback, *args, **kwargs): ++ def check_return(self, timeout_callback=None, *args, **kwargs): + maximum = kwargs.pop('maximum', None) + if self._duration is None: + return None if maximum is None else maximum +@@ -529,7 +529,7 @@ class DecayingTimer(object): + " that has not been started") + + left = self._ends_at - time.time() +- if left <= 0: ++ if left <= 0 and timeout_callback is not None: + timeout_callback(*args, **kwargs) + + return left if maximum is None else min(left, maximum) +diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py +index 0114951..0182800 100644 +--- a/oslo/messaging/_drivers/impl_rabbit.py ++++ b/oslo/messaging/_drivers/impl_rabbit.py +@@ -354,7 +354,8 @@ class DirectPublisher(Publisher): + + options = {'durable': False, + 'auto_delete': True, +- 'exclusive': False} ++ 'exclusive': False, ++ 'passive': True} + options.update(kwargs) + super(DirectPublisher, self).__init__(channel, msg_id, msg_id, + type='direct', **options) +@@ -370,6 +371,7 @@ class TopicPublisher(Publisher): + options = {'durable': conf.amqp_durable_queues, + 'auto_delete': conf.amqp_auto_delete, + 'exclusive': False} ++ + options.update(kwargs) + exchange_name = rpc_amqp.get_control_exchange(conf) + super(TopicPublisher, self).__init__(channel, +@@ -760,7 +762,31 @@ class Connection(object): + + def direct_send(self, msg_id, msg): + """Send a 'direct' message.""" +- self.publisher_send(DirectPublisher, msg_id, msg) ++ ++ timer = rpc_common.DecayingTimer(duration=60) ++ timer.start() ++ # NOTE(sileht): retry at least 60sec, after we have a good change ++ # that the caller is really dead too... ++ ++ while True: ++ try: ++ self.publisher_send(DirectPublisher, msg_id, msg) ++ except self.connection.channel_errors as exc: ++ # NOTE(noelbk/sileht): ++ # If rabbit dies, the consumer can be disconnected before the ++ # publisher sends, and if the consumer hasn't declared the ++ # queue, the publisher's will send a message to an exchange ++ # that's not bound to a queue, and the message wll be lost. ++ # So we set passive=True to the publisher exchange and catch ++ # the 404 kombu ChannelError and retry until the exchange ++ # appears ++ if exc.code == 404 and timer.check_return() > 0: ++ LOG.info(_("The exchange to reply to %s doesn't " ++ "exist yet, retrying...") % msg_id) ++ time.sleep(1) ++ continue ++ raise ++ return + + def topic_send(self, topic, msg, timeout=None): + """Send a 'topic' message.""" +diff --git a/tests/test_rabbit.py b/tests/test_rabbit.py +index a680697..88c8481 100644 +--- a/tests/test_rabbit.py ++++ b/tests/test_rabbit.py +@@ -221,8 +221,27 @@ class TestSendReceive(test_utils.BaseTestCase): + raise ZeroDivisionError + except Exception: + failure = sys.exc_info() +- msgs[i].reply(failure=failure, +- log_failure=not self.expected) ++ ++ # NOTE(noelbk) confirm that Publisher exchanges ++ # are always declared with passive=True ++ outer_self = self ++ test_exchange_was_called = [False] ++ old_init = kombu.entity.Exchange.__init__ ++ ++ def new_init(self, *args, **kwargs): ++ test_exchange_was_called[0] = True ++ outer_self.assertTrue(kwargs['passive']) ++ old_init(self, *args, **kwargs) ++ kombu.entity.Exchange.__init__ = new_init ++ ++ try: ++ msgs[i].reply(failure=failure, ++ log_failure=not self.expected) ++ finally: ++ kombu.entity.Exchange.__init__ = old_init ++ ++ self.assertTrue(test_exchange_was_called[0]) ++ + elif self.rx_id: + msgs[i].reply({'rx_id': i}) + else: +diff --git a/tests/test_utils.py b/tests/test_utils.py +index 38ee1d0..ad5fb0d 100644 +--- a/tests/test_utils.py ++++ b/tests/test_utils.py +@@ -13,6 +13,10 @@ + # License for the specific language governing permissions and limitations + # under the License. + ++import time ++ ++import mock ++ + from oslo.messaging._drivers import common + from oslo.messaging import _utils as utils + from tests import utils as test_utils +@@ -51,14 +55,28 @@ class VersionIsCompatibleTestCase(test_utils.BaseTestCase): + + + class TimerTestCase(test_utils.BaseTestCase): +- def test_duration_is_none(self): ++ def test_no_duration_no_callback(self): + t = common.DecayingTimer() + t.start() +- remaining = t.check_return(None) ++ remaining = t.check_return() + self.assertEqual(None, remaining) + +- def test_duration_is_none_and_maximun_set(self): ++ def test_no_duration_but_maximun(self): + t = common.DecayingTimer() + t.start() +- remaining = t.check_return(None, maximum=2) ++ remaining = t.check_return(maximum=2) + self.assertEqual(2, remaining) ++ ++ def test_duration_expired_no_callback(self): ++ t = common.DecayingTimer(2) ++ t._ends_at = time.time() - 10 ++ remaining = t.check_return() ++ self.assertAlmostEqual(-10, remaining, 0) ++ ++ def test_duration_callback(self): ++ t = common.DecayingTimer(2) ++ t._ends_at = time.time() - 10 ++ callback = mock.Mock() ++ remaining = t.check_return(callback) ++ self.assertAlmostEqual(-10, remaining, 0) ++ callback.assert_called_once +-- +1.9.1 + diff -Nru oslo.messaging-1.3.0/debian/patches/series oslo.messaging-1.3.0/debian/patches/series --- oslo.messaging-1.3.0/debian/patches/series 2014-04-01 15:27:47.000000000 +0100 +++ oslo.messaging-1.3.0/debian/patches/series 2015-04-16 17:37:10.000000000 +0100 @@ -1 +1,4 @@ skip-qpid-tests.patch +0001-rabbit-more-precise-iterconsume-timeout.patch +0002-rabbit-fix-timeout-timer-when-duration-is-None.patch +0003-Declare-DirectPublisher-exchanges-with-passive-True.patch