diff -Nru oslo.messaging-1.3.0/debian/changelog oslo.messaging-1.3.0/debian/changelog --- oslo.messaging-1.3.0/debian/changelog 2015-06-25 06:01:10.000000000 -0300 +++ oslo.messaging-1.3.0/debian/changelog 2015-07-13 17:48:24.000000000 -0300 @@ -1,3 +1,14 @@ +oslo.messaging (1.3.0-0ubuntu1.3) trusty; urgency=medium + + * Backport various fixes for AMQP listener/executor. (LP: #1362863). + - If msg_id is empty it doesn't sends a reply + because reply should not be expected by caller side. + - It allows to stop the eventlet executor using eventlet.cancel. + - Closes the listener connection after processing all messages. + - Closes the listener connection in case of the executor finishes. + + -- Jorge Niedbalski Tue, 30 Jun 2015 13:30:54 -0300 + oslo.messaging (1.3.0-0ubuntu1.2) trusty; urgency=medium * Detect when underlying kombu connection to rabbitmq server has been diff -Nru oslo.messaging-1.3.0/debian/patches/0004-fix-lp-1362863.patch oslo.messaging-1.3.0/debian/patches/0004-fix-lp-1362863.patch --- oslo.messaging-1.3.0/debian/patches/0004-fix-lp-1362863.patch 1969-12-31 21:00:00.000000000 -0300 +++ oslo.messaging-1.3.0/debian/patches/0004-fix-lp-1362863.patch 2015-07-13 17:47:58.000000000 -0300 @@ -0,0 +1,291 @@ +From 07781e7d9de5dcb33202e5c4ce4701fc9bfc8fb6 Mon Sep 17 00:00:00 2001 +From: Jorge Niedbalski +Date: Mon, 22 Jun 2015 21:43:56 -0300 +Subject: [PATCH] Always use a poll timeout in the executor + +This change allows to stop the eventlet executor without +using eventlet.kill. +And also, the kombu docs actually recommend that drain_events +by default have a 1 timeout. +Backport request of change: 137569 + +Also backports change: I674c0def1efb420c293897d49683593a0b10e291 + +Closes-Bug: #1362863 + +Change-Id: I3d4f187eb9537164fb1f3b9b3ab31645f452e371 +Signed-off-by: Jorge Niedbalski +--- + oslo/messaging/_drivers/amqpdriver.py | 23 ++++++++++++++++++++--- + oslo/messaging/_drivers/base.py | 26 ++++++++++++++++++++++---- + oslo/messaging/_drivers/impl_rabbit.py | 8 ++++++++ + oslo/messaging/_executors/impl_blocking.py | 17 +++++++++++++++-- + oslo/messaging/_executors/impl_eventlet.py | 14 +++++++++----- + oslo/messaging/server.py | 2 ++ + tests/test_executor.py | 14 ++++---------- + 7 files changed, 80 insertions(+), 24 deletions(-) + +diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py +index 6d395b7..b02ec95 100644 +--- a/oslo/messaging/_drivers/amqpdriver.py ++++ b/oslo/messaging/_drivers/amqpdriver.py +@@ -63,6 +63,11 @@ class AMQPIncomingMessage(base.IncomingMessage): + conn.direct_send(self.msg_id, rpc_common.serialize_msg(msg)) + + def reply(self, reply=None, failure=None, log_failure=True): ++ if not self.msg_id: ++ # NOTE(Alexei_987) not sending reply, if msg_id is empty ++ # because reply should not be expected by caller side ++ return ++ + with self.listener.driver._get_connection() as conn: + self._send_reply(conn, reply, failure, log_failure=log_failure) + self._send_reply(conn, ending=True) +@@ -88,6 +93,7 @@ class AMQPListener(base.Listener): + self.conn = conn + self.msg_id_cache = rpc_amqp._MsgIdCache() + self.incoming = [] ++ self._stopped = threading.Event() + + def __call__(self, message): + # FIXME(markmc): logging isn't driver specific +@@ -103,11 +109,22 @@ class AMQPListener(base.Listener): + ctxt.msg_id, + ctxt.reply_q)) + +- def poll(self): +- while True: ++ def poll(self, timeout=None): ++ while not self._stopped.is_set(): + if self.incoming: + return self.incoming.pop(0) +- self.conn.consume(limit=1) ++ try: ++ self.conn.consume(limit=1, timeout=timeout) ++ except rpc_common.Timeout: ++ return None ++ ++ def stop(self): ++ self._stopped.set() ++ self.conn.stop_consuming() ++ ++ def cleanup(self): ++ # Closes listener connection ++ self.conn.close() + + + class ReplyWaiters(object): +diff --git a/oslo/messaging/_drivers/base.py b/oslo/messaging/_drivers/base.py +index 82b3641..ffaebe2 100644 +--- a/oslo/messaging/_drivers/base.py ++++ b/oslo/messaging/_drivers/base.py +@@ -53,19 +53,37 @@ class Listener(object): + self.driver = driver + + @abc.abstractmethod +- def poll(self): +- "Blocking until a message is pending and return IncomingMessage." ++ def poll(self, timeout=None): ++ """Blocking until a message is pending and return IncomingMessage. ++ Return None after timeout seconds if timeout is set and no message is ++ ending or if the listener have been stopped. ++ """ ++ ++ def stop(self): ++ """Stop listener. ++ Stop the listener message polling ++ """ ++ pass ++ ++ def cleanup(self): ++ """Cleanup listener. ++ Close connection used by listener if any. For some listeners like ++ zmq there is no connection so no need to close connection. ++ As this is listener specific method, overwrite it in to derived class ++ if cleanup of listener required. ++ """ ++ pass + + + @six.add_metaclass(abc.ABCMeta) + class BaseDriver(object): + + def __init__(self, conf, url, +- default_exchange=None, allowed_remote_exmods=[]): ++ default_exchange=None, allowed_remote_exmods=None): + self.conf = conf + self._url = url + self._default_exchange = default_exchange +- self._allowed_remote_exmods = allowed_remote_exmods ++ self._allowed_remote_exmods = allowed_remote_exmods or [] + + def require_features(self, requeue=False): + if requeue: +diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py +index 0182800..0470281 100644 +--- a/oslo/messaging/_drivers/impl_rabbit.py ++++ b/oslo/messaging/_drivers/impl_rabbit.py +@@ -474,6 +474,8 @@ class Connection(object): + + self.connection = None + self.do_consume = None ++ self._consume_loop_stopped = False ++ + self.reconnect() + + # FIXME(markmc): use oslo sslutils when it is available as a library +@@ -715,6 +717,9 @@ class Connection(object): + + poll_timeout = 1 if timeout is None else min(timeout, 1) + while True: ++ if self._consume_loop_stopped: ++ self._consume_loop_stopped = False ++ raise StopIteration + try: + return self.connection.drain_events(timeout=poll_timeout) + except socket.timeout as exc: +@@ -809,6 +814,9 @@ class Connection(object): + except StopIteration: + return + ++ def stop_consuming(self): ++ self._consume_loop_stopped = True ++ + + class RabbitDriver(amqpdriver.AMQPDriverBase): + +diff --git a/oslo/messaging/_executors/impl_blocking.py b/oslo/messaging/_executors/impl_blocking.py +index 8e463a0..ba79e0f 100644 +--- a/oslo/messaging/_executors/impl_blocking.py ++++ b/oslo/messaging/_executors/impl_blocking.py +@@ -13,8 +13,15 @@ + # License for the specific language governing permissions and limitations + # under the License. + ++import logging ++ + from oslo.messaging._executors import base + ++# TODO: Mock replacement for supply lack of i18n ++_ = lambda s: s ++ ++LOG = logging.getLogger(__name__) ++ + + class BlockingExecutor(base.ExecutorBase): + +@@ -36,11 +43,17 @@ class BlockingExecutor(base.ExecutorBase): + def start(self): + self._running = True + while self._running: +- with self.dispatcher(self.listener.poll()) as callback: +- callback() ++ try: ++ incoming = self.listener.poll() ++ if incoming is not None: ++ with self.dispatcher(incoming) as callback: ++ callback() ++ except Exception: ++ LOG.exception(_("Unexpected exception occurred.")) + + def stop(self): + self._running = False ++ self.listener.stop() + + def wait(self): + pass +diff --git a/oslo/messaging/_executors/impl_eventlet.py b/oslo/messaging/_executors/impl_eventlet.py +index fb70dff..06ecafc 100644 +--- a/oslo/messaging/_executors/impl_eventlet.py ++++ b/oslo/messaging/_executors/impl_eventlet.py +@@ -20,7 +20,6 @@ from eventlet import greenpool + import greenlet + + from oslo.config import cfg +- + from oslo.messaging._executors import base + from oslo.messaging.openstack.common import excutils + +@@ -76,6 +75,7 @@ class EventletExecutor(base.ExecutorBase): + self.conf.register_opts(_eventlet_opts) + self._thread = None + self._greenpool = greenpool.GreenPool(self.conf.rpc_thread_pool_size) ++ self._running = False + + def start(self): + if self._thread is not None: +@@ -84,19 +84,23 @@ class EventletExecutor(base.ExecutorBase): + @excutils.forever_retry_uncaught_exceptions + def _executor_thread(): + try: +- while True: ++ while self._running: + incoming = self.listener.poll() +- spawn_with(ctxt=self.dispatcher(incoming), +- pool=self._greenpool) ++ if incoming is not None: ++ spawn_with(ctxt=self.dispatcher(incoming), ++ pool=self._greenpool) + except greenlet.GreenletExit: + return + ++ self._running = True + self._thread = eventlet.spawn(_executor_thread) + + def stop(self): + if self._thread is None: + return +- self._thread.kill() ++ self._running = False ++ self.listener.stop() ++ self._thread.cancel() + + def wait(self): + if self._thread is None: +diff --git a/oslo/messaging/server.py b/oslo/messaging/server.py +index bc96c57..94f4484 100644 +--- a/oslo/messaging/server.py ++++ b/oslo/messaging/server.py +@@ -141,4 +141,6 @@ class MessageHandlingServer(object): + """ + if self._executor is not None: + self._executor.wait() ++ # Close listener connection after processing all messages ++ self._executor.listener.cleanup() + self._executor = None +diff --git a/tests/test_executor.py b/tests/test_executor.py +index 509ad0c..9996c88 100644 +--- a/tests/test_executor.py ++++ b/tests/test_executor.py +@@ -30,10 +30,8 @@ load_tests = testscenarios.load_tests_apply_scenarios + + class TestExecutor(test_utils.BaseTestCase): + +- _impl = [('blocking', dict(executor=impl_blocking.BlockingExecutor, +- stop_before_return=True)), +- ('eventlet', dict(executor=impl_eventlet.EventletExecutor, +- stop_before_return=False))] ++ _impl = [('blocking', dict(executor=impl_blocking.BlockingExecutor)), ++ ('eventlet', dict(executor=impl_eventlet.EventletExecutor))] + + @classmethod + def generate_scenarios(cls): +@@ -64,13 +62,9 @@ class TestExecutor(test_utils.BaseTestCase): + message={'payload': 'data'}) + + def fake_poll(): +- if self.stop_before_return: +- executor.stop() ++ if listener.poll.call_count == 1: + return incoming_message +- else: +- if listener.poll.call_count == 1: +- return incoming_message +- executor.stop() ++ executor.stop() + + listener.poll.side_effect = fake_poll + +-- +2.1.4 + diff -Nru oslo.messaging-1.3.0/debian/patches/redeclare-consumers-when-ack-requeue-fails.patch oslo.messaging-1.3.0/debian/patches/redeclare-consumers-when-ack-requeue-fails.patch --- oslo.messaging-1.3.0/debian/patches/redeclare-consumers-when-ack-requeue-fails.patch 2015-06-25 05:59:40.000000000 -0300 +++ oslo.messaging-1.3.0/debian/patches/redeclare-consumers-when-ack-requeue-fails.patch 2015-07-13 17:47:58.000000000 -0300 @@ -25,11 +25,11 @@ oslo/messaging/_drivers/impl_rabbit.py | 7 +++++++ 1 file changed, 7 insertions(+) -diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py -index 0182800..7cbff78 100644 ---- a/oslo/messaging/_drivers/impl_rabbit.py -+++ b/oslo/messaging/_drivers/impl_rabbit.py -@@ -705,6 +705,13 @@ class Connection(object): +Index: oslo.messaging-1.3.0/oslo/messaging/_drivers/impl_rabbit.py +=================================================================== +--- oslo.messaging-1.3.0.orig/oslo/messaging/_drivers/impl_rabbit.py ++++ oslo.messaging-1.3.0/oslo/messaging/_drivers/impl_rabbit.py +@@ -696,6 +696,13 @@ class Connection(object): exc) def _consume(): @@ -43,6 +43,3 @@ if self.do_consume: queues_head = self.consumers[:-1] # not fanout. queues_tail = self.consumers[-1] # fanout --- -2.1.4 - diff -Nru oslo.messaging-1.3.0/debian/patches/series oslo.messaging-1.3.0/debian/patches/series --- oslo.messaging-1.3.0/debian/patches/series 2015-06-25 05:59:40.000000000 -0300 +++ oslo.messaging-1.3.0/debian/patches/series 2015-07-13 17:47:58.000000000 -0300 @@ -3,3 +3,4 @@ 0002-rabbit-fix-timeout-timer-when-duration-is-None.patch 0003-Declare-DirectPublisher-exchanges-with-passive-True.patch redeclare-consumers-when-ack-requeue-fails.patch +0004-fix-lp-1362863.patch