We suspect that the connection pool implementation within oslo_messaging library is not greenthread aware. We simulated the use of oslo_messaging connection pool from an independent application (Execution using python 2.7.8) using two thread models: 1. Parallel requests using green threads. {code} ################################################################# # @Usage : python file_name.py hostname port username password # # hostname : Machine IP on which rabbitMQ server is running # # port : port on which rabbitMQ server is listening # # username : username of hostname # # password : password of hostname # ################################################################# import sys import threading import os os.environ['PBR_VERSION']='1.10.0' from oslo_config import cfg import eventlet from oslo_messaging._drivers import pool from oslo_messaging._drivers.impl_rabbit import Connection from oslo_messaging._drivers import amqp as rpc_amqp from oslo_messaging._drivers import amqpdriver from oslo_messaging._drivers import base from oslo_messaging.transport import get_transport from oslo_messaging.transport import TransportURL, TransportHost max_size = 30 min_size = 0 ttl = 1200 """ The below part of code to initialize rabbit_opts is copy-paste from .../oslo_messaging/_drivers/impl_rabbit.py Line 56 to 204 version : oslo.messaging-5.12.0 """ rabbit_opts = [ cfg.StrOpt('kombu_ssl_version', default='', deprecated_group='DEFAULT', help='SSL version to use (valid only if SSL enabled). ' 'Valid values are TLSv1 and SSLv23. SSLv2, SSLv3, ' 'TLSv1_1, and TLSv1_2 may be available on some ' 'distributions.' ), cfg.StrOpt('kombu_ssl_keyfile', default='', deprecated_group='DEFAULT', help='SSL key file (valid only if SSL enabled).'), cfg.StrOpt('kombu_ssl_certfile', default='', deprecated_group='DEFAULT', help='SSL cert file (valid only if SSL enabled).'), cfg.StrOpt('kombu_ssl_ca_certs', default='', deprecated_group='DEFAULT', help='SSL certification authority file ' '(valid only if SSL enabled).'), cfg.FloatOpt('kombu_reconnect_delay', default=1.0, deprecated_group='DEFAULT', help='How long to wait before reconnecting in response to an ' 'AMQP consumer cancel notification.'), cfg.StrOpt('kombu_compression', help="EXPERIMENTAL: Possible values are: gzip, bz2. If not " "set compression will not be used. This option may not " "be available in future versions."), cfg.IntOpt('kombu_missing_consumer_retry_timeout', deprecated_name="kombu_reconnect_timeout", default=60, help='How long to wait a missing client before abandoning to ' 'send it its replies. This value should not be longer ' 'than rpc_response_timeout.'), cfg.StrOpt('kombu_failover_strategy', choices=('round-robin', 'shuffle'), default='round-robin', help='Determines how the next RabbitMQ node is chosen in case ' 'the one we are currently connected to becomes ' 'unavailable. Takes effect only if more than one ' 'RabbitMQ node is provided in config.'), cfg.StrOpt('rabbit_host', default='localhost', deprecated_group='DEFAULT', deprecated_for_removal=True, deprecated_reason="Replaced by [DEFAULT]/transport_url", help='The RabbitMQ broker address where a single node is ' 'used.'), cfg.PortOpt('rabbit_port', default=5672, deprecated_group='DEFAULT', deprecated_for_removal=True, deprecated_reason="Replaced by [DEFAULT]/transport_url", help='The RabbitMQ broker port where a single node is used.'), cfg.ListOpt('rabbit_hosts', default=['$rabbit_host:$rabbit_port'], deprecated_group='DEFAULT', deprecated_for_removal=True, deprecated_reason="Replaced by [DEFAULT]/transport_url", help='RabbitMQ HA cluster host:port pairs.'), cfg.BoolOpt('rabbit_use_ssl', default=False, deprecated_group='DEFAULT', help='Connect over SSL for RabbitMQ.'), cfg.StrOpt('rabbit_userid', default='guest', deprecated_group='DEFAULT', deprecated_for_removal=True, deprecated_reason="Replaced by [DEFAULT]/transport_url", help='The RabbitMQ userid.'), cfg.StrOpt('rabbit_password', default='guest', deprecated_group='DEFAULT', deprecated_for_removal=True, deprecated_reason="Replaced by [DEFAULT]/transport_url", help='The RabbitMQ password.', secret=True), cfg.StrOpt('rabbit_login_method', choices=('PLAIN', 'AMQPLAIN', 'RABBIT-CR-DEMO'), default='AMQPLAIN', deprecated_group='DEFAULT', help='The RabbitMQ login method.'), cfg.StrOpt('rabbit_virtual_host', default='/', deprecated_group='DEFAULT', deprecated_for_removal=True, deprecated_reason="Replaced by [DEFAULT]/transport_url", help='The RabbitMQ virtual host.'), cfg.IntOpt('rabbit_retry_interval', default=1, help='How frequently to retry connecting with RabbitMQ.'), cfg.IntOpt('rabbit_retry_backoff', default=2, deprecated_group='DEFAULT', help='How long to backoff for between retries when connecting ' 'to RabbitMQ.'), cfg.IntOpt('rabbit_interval_max', default=30, help='Maximum interval of RabbitMQ connection retries. ' 'Default is 30 seconds.'), cfg.IntOpt('rabbit_max_retries', default=0, deprecated_for_removal=True, deprecated_group='DEFAULT', help='Maximum number of RabbitMQ connection retries. ' 'Default is 0 (infinite retry count).'), cfg.BoolOpt('rabbit_ha_queues', default=False, deprecated_group='DEFAULT', help='Try to use HA queues in RabbitMQ (x-ha-policy: all). ' 'If you change this option, you must wipe the RabbitMQ ' 'database. In RabbitMQ 3.0, queue mirroring is no longer ' 'controlled by the x-ha-policy argument when declaring a ' 'queue. If you just want to make sure that all queues (except ' 'those with auto-generated names) are mirrored across all ' 'nodes, run: ' """\"rabbitmqctl set_policy HA '^(?!amq\.).*' """ """'{"ha-mode": "all"}' \""""), cfg.IntOpt('rabbit_transient_queues_ttl', min=1, default=1800, help='Positive integer representing duration in seconds for ' 'queue TTL (x-expires). Queues which are unused for the ' 'duration of the TTL are automatically deleted. The ' 'parameter affects only reply and fanout queues.'), cfg.IntOpt('rabbit_qos_prefetch_count', default=0, help='Specifies the number of messages to prefetch. Setting to ' 'zero allows unlimited messages.'), cfg.IntOpt('heartbeat_timeout_threshold', default=60, help="Number of seconds after which the Rabbit broker is " "considered down if heartbeat's keep-alive fails " "(0 disable the heartbeat). EXPERIMENTAL"), cfg.IntOpt('heartbeat_rate', default=2, help='How often times during the heartbeat_timeout_threshold ' 'we check the heartbeat.'), # NOTE(sileht): deprecated option since oslo_messaging 1.5.0, cfg.BoolOpt('fake_rabbit', default=False, deprecated_group='DEFAULT', help='Deprecated, use rpc_backend=kombu+memory or ' 'rpc_backend=fake'), ] def main(args): global rabbit_opts conf = cfg.CONF opt_group = cfg.OptGroup(name='oslo_messaging_rabbit', title='RabbitMQ driver options') conf.register_opts(rabbit_opts, group=opt_group) conf.register_opts(rpc_amqp.amqp_opts, group=opt_group) conf.register_opts(base.base_opts, group=opt_group) conf.register_group(opt_group) host = [] hst,prt,u,p = args[1],args[2],args[3],args[4] host.append(TransportHost(hostname=hst, port=prt, username=u, password=p)) url = TransportURL(conf, transport = "rabbit", hosts = host) connection_pool = pool.ConnectionPool(conf, max_size,min_size,ttl,url,Connection) execute(connection_pool) get_count = 0 put_count = 0 def get_sleep_put(connection_pool): global get_count global put_count print "Before connection_pool.get() . Concurrent active connection objects are %s" %get_count item = connection_pool.get() get_count += 1 print "After connection_pool.get() . Concurrent active connection objects are %s" %get_count eventlet.sleep(20) print "Before connection_pool.put() . Current available connection objects are %s" %put_count connection_pool.put(item) put_count += 1 print "After connection_pool.put() . Current available connection objects are %s" %put_count def execute(connection_pool): POOL_SIZE = 50 pool = eventlet.GreenPool(POOL_SIZE) for i in range(POOL_SIZE): pool.spawn(get_sleep_put, connection_pool) pool.waitall() if __name__ == "__main__": if len(sys.argv) != 5: print "**** Invalid usage ****" exit() main(sys.argv) print "**** Application Finished ****" {code} 2. Parallel requests using kernel threads {code} ################################################################# # @Usage : python file_name.py hostname port username password # # hostname : Machine IP on which rabbitMQ server is running # # port : port on which rabbitMQ server is listening # # username : username of hostname # # password : password of hostname # ################################################################# import sys import threading import os os.environ['PBR_VERSION']='1.10.0' from oslo_config import cfg import time from oslo_messaging._drivers import pool from oslo_messaging._drivers.impl_rabbit import Connection from oslo_messaging._drivers import amqp as rpc_amqp from oslo_messaging._drivers import amqpdriver from oslo_messaging._drivers import base from oslo_messaging.transport import get_transport from oslo_messaging.transport import TransportURL, TransportHost max_size = 30 min_size = 0 ttl = 1200 """ The below part of code to initialize rabbit_opts is copy-paste from .../oslo_messaging/_drivers/impl_rabbit.py Line 56 to 204 version : oslo.messaging-5.12.0 """ rabbit_opts = [ cfg.StrOpt('kombu_ssl_version', default='', deprecated_group='DEFAULT', help='SSL version to use (valid only if SSL enabled). ' 'Valid values are TLSv1 and SSLv23. SSLv2, SSLv3, ' 'TLSv1_1, and TLSv1_2 may be available on some ' 'distributions.' ), cfg.StrOpt('kombu_ssl_keyfile', default='', deprecated_group='DEFAULT', help='SSL key file (valid only if SSL enabled).'), cfg.StrOpt('kombu_ssl_certfile', default='', deprecated_group='DEFAULT', help='SSL cert file (valid only if SSL enabled).'), cfg.StrOpt('kombu_ssl_ca_certs', default='', deprecated_group='DEFAULT', help='SSL certification authority file ' '(valid only if SSL enabled).'), cfg.FloatOpt('kombu_reconnect_delay', default=1.0, deprecated_group='DEFAULT', help='How long to wait before reconnecting in response to an ' 'AMQP consumer cancel notification.'), cfg.StrOpt('kombu_compression', help="EXPERIMENTAL: Possible values are: gzip, bz2. If not " "set compression will not be used. This option may not " "be available in future versions."), cfg.IntOpt('kombu_missing_consumer_retry_timeout', deprecated_name="kombu_reconnect_timeout", default=60, help='How long to wait a missing client before abandoning to ' 'send it its replies. This value should not be longer ' 'than rpc_response_timeout.'), cfg.StrOpt('kombu_failover_strategy', choices=('round-robin', 'shuffle'), default='round-robin', help='Determines how the next RabbitMQ node is chosen in case ' 'the one we are currently connected to becomes ' 'unavailable. Takes effect only if more than one ' 'RabbitMQ node is provided in config.'), cfg.StrOpt('rabbit_host', default='localhost', deprecated_group='DEFAULT', deprecated_for_removal=True, deprecated_reason="Replaced by [DEFAULT]/transport_url", help='The RabbitMQ broker address where a single node is ' 'used.'), cfg.PortOpt('rabbit_port', default=5672, deprecated_group='DEFAULT', deprecated_for_removal=True, deprecated_reason="Replaced by [DEFAULT]/transport_url", help='The RabbitMQ broker port where a single node is used.'), cfg.ListOpt('rabbit_hosts', default=['$rabbit_host:$rabbit_port'], deprecated_group='DEFAULT', deprecated_for_removal=True, deprecated_reason="Replaced by [DEFAULT]/transport_url", help='RabbitMQ HA cluster host:port pairs.'), cfg.BoolOpt('rabbit_use_ssl', default=False, deprecated_group='DEFAULT', help='Connect over SSL for RabbitMQ.'), cfg.StrOpt('rabbit_userid', default='guest', deprecated_group='DEFAULT', deprecated_for_removal=True, deprecated_reason="Replaced by [DEFAULT]/transport_url", help='The RabbitMQ userid.'), cfg.StrOpt('rabbit_password', default='guest', deprecated_group='DEFAULT', deprecated_for_removal=True, deprecated_reason="Replaced by [DEFAULT]/transport_url", help='The RabbitMQ password.', secret=True), cfg.StrOpt('rabbit_login_method', choices=('PLAIN', 'AMQPLAIN', 'RABBIT-CR-DEMO'), default='AMQPLAIN', deprecated_group='DEFAULT', help='The RabbitMQ login method.'), cfg.StrOpt('rabbit_virtual_host', default='/', deprecated_group='DEFAULT', deprecated_for_removal=True, deprecated_reason="Replaced by [DEFAULT]/transport_url", help='The RabbitMQ virtual host.'), cfg.IntOpt('rabbit_retry_interval', default=1, help='How frequently to retry connecting with RabbitMQ.'), cfg.IntOpt('rabbit_retry_backoff', default=2, deprecated_group='DEFAULT', help='How long to backoff for between retries when connecting ' 'to RabbitMQ.'), cfg.IntOpt('rabbit_interval_max', default=30, help='Maximum interval of RabbitMQ connection retries. ' 'Default is 30 seconds.'), cfg.IntOpt('rabbit_max_retries', default=0, deprecated_for_removal=True, deprecated_group='DEFAULT', help='Maximum number of RabbitMQ connection retries. ' 'Default is 0 (infinite retry count).'), cfg.BoolOpt('rabbit_ha_queues', default=False, deprecated_group='DEFAULT', help='Try to use HA queues in RabbitMQ (x-ha-policy: all). ' 'If you change this option, you must wipe the RabbitMQ ' 'database. In RabbitMQ 3.0, queue mirroring is no longer ' 'controlled by the x-ha-policy argument when declaring a ' 'queue. If you just want to make sure that all queues (except ' 'those with auto-generated names) are mirrored across all ' 'nodes, run: ' """\"rabbitmqctl set_policy HA '^(?!amq\.).*' """ """'{"ha-mode": "all"}' \""""), cfg.IntOpt('rabbit_transient_queues_ttl', min=1, default=1800, help='Positive integer representing duration in seconds for ' 'queue TTL (x-expires). Queues which are unused for the ' 'duration of the TTL are automatically deleted. The ' 'parameter affects only reply and fanout queues.'), cfg.IntOpt('rabbit_qos_prefetch_count', default=0, help='Specifies the number of messages to prefetch. Setting to ' 'zero allows unlimited messages.'), cfg.IntOpt('heartbeat_timeout_threshold', default=60, help="Number of seconds after which the Rabbit broker is " "considered down if heartbeat's keep-alive fails " "(0 disable the heartbeat). EXPERIMENTAL"), cfg.IntOpt('heartbeat_rate', default=2, help='How often times during the heartbeat_timeout_threshold ' 'we check the heartbeat.'), # NOTE(sileht): deprecated option since oslo_messaging 1.5.0, cfg.BoolOpt('fake_rabbit', default=False, deprecated_group='DEFAULT', help='Deprecated, use rpc_backend=kombu+memory or ' 'rpc_backend=fake'), ] def main(args): global rabbit_opts conf = cfg.CONF opt_group = cfg.OptGroup(name='oslo_messaging_rabbit', title='RabbitMQ driver options') conf.register_opts(rabbit_opts, group=opt_group) conf.register_opts(rpc_amqp.amqp_opts, group=opt_group) conf.register_opts(base.base_opts, group=opt_group) conf.register_group(opt_group) host = [] hst,prt,u,p = args[1],args[2],args[3],args[4] host.append(TransportHost(hostname=hst, port=prt, username=u, password=p)) url = TransportURL(conf, transport = "rabbit", hosts = host) connection_pool = pool.ConnectionPool(conf, max_size,min_size,ttl,url,Connection) execute(connection_pool) get_count = 0 put_count = 0 def get_sleep_put(connection_pool): global get_count global put_count print "Before connection_pool.get() . Concurrent active connection objects are %s" %get_count item = connection_pool.get() get_count += 1 print "After connection_pool.get() . Concurrent active connection objects are %s" %get_count time.sleep(20) print "Before connection_pool.put() . Current available connection objects are %s" %put_count connection_pool.put(item) put_count += 1 print "After connection_pool.put() . Current available connection objects are %s" %put_count def execute(connection_pool): POOL_SIZE = 50 lst = [] for i in range(POOL_SIZE): d = threading.Thread(name='daemon', target=get_sleep_put, args=(connection_pool,)) lst.append(d) d.start() for t in lst: t.join() if __name__ == "__main__": if len(sys.argv) != 5: print "**** Invalid usage ****" exit() main(sys.argv) print "**** Application Finished ****" {code} The first application was stuck as soon as 30 connections were established and used up from connection pool. The 31st request was waiting indefinitely for a connection from pool. The second application completed successfully. Once the first 30 requests were completed, the next set of requests were handled successfully. Based on above, we have observed and verified that connection pool mechanism in oslo_messaging library is not green thread aware.