commit 737ba36162b9f50755a2488b925be991eb3733ae Author: Alexei Kornienko Date: Thu Jul 10 12:08:29 2014 +0300 Added heartbeat check to impl_rabbit Change-Id: I1aac826a04a7c887af6be47840dbde7fe35d4d7b diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py index 84f7cbf..bb5082a 100644 --- a/oslo/messaging/_drivers/impl_rabbit.py +++ b/oslo/messaging/_drivers/impl_rabbit.py @@ -18,6 +18,7 @@ import logging import random import socket import ssl +import threading import time import uuid @@ -101,6 +102,10 @@ rabbit_opts = [ 'If you change this option, you must wipe the ' 'RabbitMQ database.'), + cfg.IntOpt('rabbit_heartbeat', + default=5, + help='seconds between rabbit keep-alive heartbeat.'), + # FIXME(markmc): this was toplevel in openstack.common.rpc cfg.BoolOpt('fake_rabbit', default=False, @@ -453,7 +458,8 @@ class Connection(object): 'userid': host.username or '', 'password': host.password or '', 'login_method': self.conf.rabbit_login_method, - 'virtual_host': virtual_host + 'virtual_host': virtual_host, + 'heartbeat': self.conf.rabbit_heartbeat, } if self.conf.fake_rabbit: params['transport'] = 'memory' @@ -473,7 +479,8 @@ class Connection(object): 'userid': self.conf.rabbit_userid, 'password': self.conf.rabbit_password, 'login_method': self.conf.rabbit_login_method, - 'virtual_host': virtual_host + 'virtual_host': virtual_host, + 'heartbeat': self.conf.rabbit_heartbeat, } if self.conf.fake_rabbit: @@ -489,6 +496,7 @@ class Connection(object): self.memory_transport = self.conf.fake_rabbit self.connection = None + self.do_consume = None self.reconnect() @@ -579,6 +587,9 @@ class Connection(object): pass self.connection = None + def send_heartbeat(self): + self.connection.heartbeat_check() + def reconnect(self, retry=None): """Handles reconnecting and re-establishing queues. Will retry up to retry number of times. @@ -803,6 +814,30 @@ class Connection(object): return +class RabbitListener(amqpdriver.AMQPListener): + + def poll(self, timeout=None): + if timeout is not None: + deadline = time.time() + timeout + else: + deadline = None + while True: + if self.incoming: + return self.incoming.pop(0) + try: + self.conn.send_heartbeat() + except Exception: + self.conn.reconnect() + if deadline is not None: + timeout = deadline - time.time() + if timeout < 0: + return None + try: + self.conn.consume(limit=1, timeout=1) + except rpc_common.Timeout: + continue + + class RabbitDriver(amqpdriver.AMQPDriverBase): def __init__(self, conf, url, @@ -820,3 +855,30 @@ class RabbitDriver(amqpdriver.AMQPDriverBase): def require_features(self, requeue=True): pass + + def listen(self, target): + conn = self._get_connection(pooled=False) + + listener = RabbitListener(self, conn) + + conn.declare_topic_consumer(exchange_name=self._get_exchange(target), + topic=target.topic, + callback=listener) + conn.declare_topic_consumer(exchange_name=self._get_exchange(target), + topic='%s.%s' % (target.topic, + target.server), + callback=listener) + conn.declare_fanout_consumer(target.topic, listener) + + return listener + + def listen_for_notifications(self, targets_and_priorities): + conn = self._get_connection(pooled=False) + + listener = RabbitListener(self, conn) + for target, priority in targets_and_priorities: + conn.declare_topic_consumer( + exchange_name=self._get_exchange(target), + topic='%s.%s' % (target.topic, priority), + callback=listener) + return listener