Cannot stop messaging listener after health policy detached

Bug #1732355 reported by RUIJIE YUAN on 2017-11-15
6
This bug affects 1 person
Affects Status Importance Assigned to Milestone
senlin
Undecided
chenyb4

Bug Description

We defined the message handlers in health_`manager.py` which can process incoming messages from Nova or Heat side, and we have a factory method to register the endpoint to create a listener to poll messages from target topics with the filter-rule. However:

```
listener = messaging.get_notification_listener(
        transport, targets, endpoints, executor='threading',
        pool="senlin-listeners")

listener.start()
```

The code above will create a listener by the messaging driver, the driver will create an executor will threadpool silently to process the message from the MessageQueue! Once the poll message from target topic, the message will be transfer to the handler/endpoint object.

We are using ThreadGroup to start a listener:
```
return self.TG.add_thread(ListenerProc, exchange, project, cluster_id)
```

This process will `return` immediately once the listener created and started, and the return value is the `Thread` we used to create and start the listener, and it will be released immediately either.

```
# This will kill the thread, but will not stop listener
self.TG.thread_done(listener)
listener.stop()
```

This oslo.messaging provide 2 method to stop the listener:
```
    @ordered(after='start')
    def stop(self):
        """Stop handling incoming messages.

        Once this method returns, no new incoming messages will be handled by
        the server. However, the server may still be in the process of handling
        some messages, and underlying driver resources associated to this
        server are still in use. See 'wait' for more details.
        """
        self.listener.stop()
        self._started = False

    @ordered(after='stop')
    def wait(self):
        """Wait for message processing to complete.

        After calling stop(), there may still be some existing messages
        which have not been completely processed. The wait() method blocks
        until all message processing has completed.

        Once it's finished, the underlying driver resources associated to this
        server are released (like closing useless network connections).
        """
        self._work_executor.shutdown(wait=True)

        # Close listener connection after processing all messages
        self.listener.cleanup()

```

RUIJIE YUAN (cnjie0616) wrote :

Hopefully, we can start the listener directly but current thread, the return value will be the Listener itself.

So that we can stop the listener if needed.

chenyb4 (chenyb4) wrote :

Yes, I have test it. the oslo.messaging will stop.

Example:

http://paste.ubuntu.com/26011183/

I will simple test it.

chenyb4 (chenyb4) wrote :

We also need to solve a multi-cluster environment, where a single cluster consumes a message and other clusters cannot consume queue messages.

chenyb4 (chenyb4) on 2017-11-22
Changed in senlin:
assignee: nobody → chenyb4 (chenyb4)
Changed in senlin:
status: New → In Progress
To post a comment you must log in.
This report contains Public information  Edit
Everyone can see this information.

Other bug subscribers