rabbit: duplicated consumer tags

Bug #1740984 reported by hebin
6
This bug affects 1 person
Affects Status Importance Assigned to Milestone
oslo.messaging
Expired
Undecided
Unassigned

Bug Description

When more than 1500 compute nodes connecting to a single rabbitmq broker simultaneously, I come across some messaging error in few compute nodes.The error mainly reports:Failed to consume message from queue: Basic.consume: (530) NOT_ALLOWED - attempt to reuse consumer tag '1'. I think this means the tags of consumers (self.consumers) are duplicated. Why this occured?
When many compute nodes connecting (my situation is more than 1500 nodes) to the rabbit broker, the rabbitmq server working hard to process incoming TCP connection requests. A compute node can firstly establish a connection with the broker successfully,then this connection happen to be disconnected by the rabbitmq server due to it's heavily workload.
When starting rpc server of nova-compute service, rabbit connection object will declare three consumers and stored in self.consumers:

    def listen(self, target):
        conn = self._get_connection(rpc_amqp.PURPOSE_LISTEN)

        listener = AMQPListener(self, conn)

        conn.declare_topic_consumer(exchange_name=self._get_exchange(target), # 1
                      topic=target.topic,
                      callback=listener)
        conn.declare_topic_consumer(exchange_name=self._get_exchange(target), # 2
                      topic='%s.%s' % (target.topic,
                               target.server),
                      callback=listener)
        conn.declare_fanout_consumer(target.topic, listener) # 3

        return listener

What if an connection error occured in step # 2? At the moment, there is a TopicConsumer defined in self.consumers with tag = 1. It will call the on_reconnection method in impl_rabbit.py, but the on_reconnection method reset the consumer_num to itertools.count(1) (# 4):
        def on_reconnection(new_channel):
            """Callback invoked when the kombu reconnects and creates
            a new channel, we use it the reconfigure our consumers.
            """
            self._set_current_channel(new_channel)
            self.consumer_num = itertools.count(1) # 4
            for consumer in self.consumers:
                consumer.reconnect(new_channel)

            LOG.info(_LI('Reconnected to AMQP server on '
                         '%(hostname)s:%(port)d'),
                     {'hostname': self.connection.hostname,
                      'port': self.connection.port})

So, when successfully reconnected to the rabbitmq server, step # 2 will repeats to define a consumer with tag 1 and store in self.consumers. AHA!

Now, it's time to really start consume from broker. So, the rabbitmq server reports this NOT_ALLOWED(530) error and disconnect the connection.

I have read code about rabbit of oslo_messaging, I don't know what's line # 4 intend for! So, I refactor the function on_reconnection to this and it solve this bug:

        def on_reconnection(new_channel):
            """Callback invoked when the kombu reconnects and creates
            a new channel, we use it the reconfigure our consumers.
            """
            self._set_current_channel(new_channel)
            if self.consumers:
                self.consumer_num = itertools.count(int(
                    max(self.consumers, key=lambda x: x.tag).tag) + 1)
            else:
                self.consumer_num = itertools.count(1)
            for consumer in self.consumers:
                consumer.reconnect(new_channel)

            LOG.info(_LI('Reconnected to AMQP server on '
                         '%(hostname)s:%(port)d'),
                     {'hostname': self.connection.hostname,
                      'port': self.connection.port})

Steps to reproduce this bug:
    1. put some breakponits in conn.declare_XXX_consumer, as follows:
    def listen(self, target):
        conn = self._get_connection(rpc_amqp.PURPOSE_LISTEN)

        listener = AMQPListener(self, conn)

        conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
                      topic=target.topic,
                      callback=listener)
        # import pdb; pdb.set_trace() # 5
        conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
                      topic='%s.%s' % (target.topic,
                               target.server),
                      callback=listener)
        conn.declare_fanout_consumer(target.topic, listener)

        return listener

    2. when code executes here(# 5), close the connection from the rabbitmq server use command: rabbitmqctl close_connection.
    3. continue to run code.

Revision history for this message
hebin (491309649-t) wrote :
description: updated
description: updated
hebin (491309649-t)
description: updated
description: updated
hebin (491309649-t)
description: updated
Revision history for this message
Ken Giusti (kgiusti) wrote :

hebin -

What release of oslo.messaging are you using? The latest supported versions of oslo.messaging (ocata & pike) no longer have that weird iterator code.

In fact that code was removed way back in the liberty release.

Can you upgrade to one of the supported versions and reproduce?

thanks

Ken Giusti (kgiusti)
Changed in oslo.messaging:
status: New → Incomplete
Revision history for this message
Launchpad Janitor (janitor) wrote :

[Expired for oslo.messaging because there has been no activity for 60 days.]

Changed in oslo.messaging:
status: Incomplete → Expired
To post a comment you must log in.
This report contains Public information  
Everyone can see this information.

Other bug subscribers

Remote bug watches

Bug watches keep track of this bug in other bug trackers.