Cannot stop messaging listener after health policy detached

Bug #1732355 reported by RUIJIE YUAN
6
This bug affects 1 person
Affects Status Importance Assigned to Milestone
senlin
In Progress
Undecided
chenyb4

Bug Description

We defined the message handlers in health_`manager.py` which can process incoming messages from Nova or Heat side, and we have a factory method to register the endpoint to create a listener to poll messages from target topics with the filter-rule. However:

```
listener = messaging.get_notification_listener(
        transport, targets, endpoints, executor='threading',
        pool="senlin-listeners")

listener.start()
```

The code above will create a listener by the messaging driver, the driver will create an executor will threadpool silently to process the message from the MessageQueue! Once the poll message from target topic, the message will be transfer to the handler/endpoint object.

We are using ThreadGroup to start a listener:
```
return self.TG.add_thread(ListenerProc, exchange, project, cluster_id)
```

This process will `return` immediately once the listener created and started, and the return value is the `Thread` we used to create and start the listener, and it will be released immediately either.

```
# This will kill the thread, but will not stop listener
self.TG.thread_done(listener)
listener.stop()
```

This oslo.messaging provide 2 method to stop the listener:
```
    @ordered(after='start')
    def stop(self):
        """Stop handling incoming messages.

        Once this method returns, no new incoming messages will be handled by
        the server. However, the server may still be in the process of handling
        some messages, and underlying driver resources associated to this
        server are still in use. See 'wait' for more details.
        """
        self.listener.stop()
        self._started = False

    @ordered(after='stop')
    def wait(self):
        """Wait for message processing to complete.

        After calling stop(), there may still be some existing messages
        which have not been completely processed. The wait() method blocks
        until all message processing has completed.

        Once it's finished, the underlying driver resources associated to this
        server are released (like closing useless network connections).
        """
        self._work_executor.shutdown(wait=True)

        # Close listener connection after processing all messages
        self.listener.cleanup()

```

Revision history for this message
RUIJIE YUAN (cnjie0616) wrote :

Hopefully, we can start the listener directly but current thread, the return value will be the Listener itself.

So that we can stop the listener if needed.

Revision history for this message
RUIJIE YUAN (cnjie0616) wrote :
Revision history for this message
chenyb4 (chenyb4) wrote :

Yes, I have test it. the oslo.messaging will stop.

Example:

http://paste.ubuntu.com/26011183/

I will simple test it.

Revision history for this message
chenyb4 (chenyb4) wrote :

We also need to solve a multi-cluster environment, where a single cluster consumes a message and other clusters cannot consume queue messages.

chenyb4 (chenyb4)
Changed in senlin:
assignee: nobody → chenyb4 (chenyb4)
Changed in senlin:
status: New → In Progress
Revision history for this message
OpenStack Infra (hudson-openstack) wrote : Change abandoned on senlin (master)

Change abandoned by "Erik Olof Gunnar Andersson <email address hidden>" on branch: master
Review: https://review.opendev.org/c/openstack/senlin/+/521892

To post a comment you must log in.
This report contains Public information  
Everyone can see this information.

Other bug subscribers

Remote bug watches

Bug watches keep track of this bug in other bug trackers.