rabbit: duplicated consumer tags
Affects | Status | Importance | Assigned to | Milestone | |
---|---|---|---|---|---|
oslo.messaging |
Expired
|
Undecided
|
Unassigned |
Bug Description
When more than 1500 compute nodes connecting to a single rabbitmq broker simultaneously, I come across some messaging error in few compute nodes.The error mainly reports:Failed to consume message from queue: Basic.consume: (530) NOT_ALLOWED - attempt to reuse consumer tag '1'. I think this means the tags of consumers (self.consumers) are duplicated. Why this occured?
When many compute nodes connecting (my situation is more than 1500 nodes) to the rabbit broker, the rabbitmq server working hard to process incoming TCP connection requests. A compute node can firstly establish a connection with the broker successfully,then this connection happen to be disconnected by the rabbitmq server due to it's heavily workload.
When starting rpc server of nova-compute service, rabbit connection object will declare three consumers and stored in self.consumers:
def listen(self, target):
conn = self._get_
listener = AMQPListener(self, conn)
return listener
What if an connection error occured in step # 2? At the moment, there is a TopicConsumer defined in self.consumers with tag = 1. It will call the on_reconnection method in impl_rabbit.py, but the on_reconnection method reset the consumer_num to itertools.count(1) (# 4):
def on_reconnection
a new channel, we use it the reconfigure our consumers.
"""
for consumer in self.consumers:
So, when successfully reconnected to the rabbitmq server, step # 2 will repeats to define a consumer with tag 1 and store in self.consumers. AHA!
Now, it's time to really start consume from broker. So, the rabbitmq server reports this NOT_ALLOWED(530) error and disconnect the connection.
I have read code about rabbit of oslo_messaging, I don't know what's line # 4 intend for! So, I refactor the function on_reconnection to this and it solve this bug:
def on_reconnection
a new channel, we use it the reconfigure our consumers.
"""
if self.consumers:
else:
for consumer in self.consumers:
Steps to reproduce this bug:
1. put some breakponits in conn.declare_
def listen(self, target):
conn = self._get_
listener = AMQPListener(self, conn)
# import pdb; pdb.set_trace() # 5
return listener
2. when code executes here(# 5), close the connection from the rabbitmq server use command: rabbitmqctl close_connection.
3. continue to run code.
description: | updated |
description: | updated |
description: | updated |
Changed in oslo.messaging: | |
status: | New → Incomplete |
hebin -
What release of oslo.messaging are you using? The latest supported versions of oslo.messaging (ocata & pike) no longer have that weird iterator code.
In fact that code was removed way back in the liberty release.
Can you upgrade to one of the supported versions and reproduce?
thanks