Calling waiters.wait_for_any() blocks if future has called Condition.wait()

Bug #1848457 reported by Dantali0n
6
This bug affects 1 person
Affects Status Importance Assigned to Milestone
futurist
Opinion
Medium
Herve Beraud

Bug Description

For additional reference see: https://stackoverflow.com/questions/58410610/calling-condition-wait-inside-thread-causes-retrieval-of-any-future-to-block-o

I created my own reentrant read write lock which uses Condtion.wait() to wait on pending locks.
This lock is shared with two tasks that are submitted to a GreenThreadPoolExecutor.

Since one of the tasks tries to acquire the read lock and the other the write lock one of the two will block on the Condition.wait().

When waiters.wait_for_any() is called the expected behavior is that future that has successfully ran be returned as its execution has finished while the other remains undone as it is blocked on condition.wait(). However, waiters.wait_for_any() blocks indefinitely and the timeout parameter is subsequently ignored.

I believe this behavior to be incorrect as one of the futures has successfully finished and as a result should be returned by wait_for_any.

I use the rest of the information to describe the classes, methods and calls that lead to this behavior. Starting with large parts of the ReentrantReadWriteLock:

class ReentrantReadWriteLock(object):
    def __init__(self):

        self._read_lock = RLock()
        self._write_lock = RLock()
        self._rcondition = Condition(lock=self._read_lock)
        self._wcondition = Condition(lock=self._write_lock)
        self._num_readers = 0
        self._wants_write = False

    def read_acquire(self, blocking=True, timeout=-1):
        waitout = None if timeout == -1 else timeout
        LOG.warning("read waitoit: {0}".format(waitout))
        first_it = True
        try:
            if self._read_lock.acquire(blocking, timeout):
                LOG.warning("read internal lock acquired")
                while self._wants_write:
                    LOG.warning("read wants write true")
                    if not blocking or not first_it:
                        LOG.warning("read non blocking or second it")
                        return False
                    LOG.warning("read wait")
                    self._rcondition.wait(waitout)
                    first_it = False
                LOG.warning("read acquired lock")
                self._num_readers += 1
                return True
            LOG.warning("read internal lock failed")
            return False
        finally:
            self._read_lock.release()

    def write_acquire(self, blocking=True, timeout=-1):
        waitout = None if timeout == -1 else timeout
        LOG.warning("write waitoit: {0}".format(waitout))
        first_it = True
        try:
            if self._write_lock.acquire(blocking, timeout):
                LOG.warning("write internal lock acquired")
                while self._num_readers > 0 or self._wants_write:
                    LOG.warning("write wants write true or num read")
                    if not blocking or not first_it:
                        LOG.warning("write non blocking or second it")
                        return False
                    LOG.warning("write wait")
                    self._wcondition.wait(waitout)
                    first_it = False
                LOG.warning("write acquired lock")
                self._wants_write = True
                return True
            LOG.warning("write internal lock failed")
            return False
        finally:
            self._write_lock.release()

This is my test case, this test will block on results = waiters.wait_for_any(futures), once again
specifying the timeout parameter does not resolve the issue:

def get_read(self, rrwlock):
    return rrwlock.read_acquire()

def get_write(self, rrwlock):
    return rrwlock.write_acquire()

def test():
    self._threadpool = futurist.GreenThreadPoolExecutor(max_workers=4)
    rrwlock = ReentrantReadWriteLock()
    futures = []
    futures.append(self._threadpool.submit(self.get_read, rrwlock))
    futures.append(self._threadpool.submit(self.get_write, rrwlock))

    # Get the results and verify only one of the calls succeeded
    # assert that the other call is still pending
    results = waiters.wait_for_any(futures)
    self.assertTrue(results[0].pop().result)
    self.assertEqual(1, len(results[1]))

Revision history for this message
Dantali0n (dantalion) wrote :

This bug seems very related even though closed: https://bugs.python.org/issue20319

It describes how concurrent.futures.wait() can block forever even if futures have finished. Its closed but based on the discussion I wouldn't actually call it resolved.

I have tracked that the blocking on the main thread occurs on waiter.event.wait(timeout) line 208 in file waiters.py

Revision history for this message
Herve Beraud (herveberaud) wrote :

Hello

The python bug seems fixed since 2014 but but `futurist` implement the same code which are not fixed:
- https://github.com/openstack/futurist/blob/6842b2d35a14999531759fd206b735b1bedcce34/futurist/waiters.py#L190
- https://github.com/openstack/futurist/blob/6842b2d35a14999531759fd206b735b1bedcce34/futurist/waiters.py#L210

I just submitted a fixed:
- https://review.opendev.org/689691

Else, which version of python and futurist are you using here?

Revision history for this message
Dantali0n (dantalion) wrote :

Ah so the bug I referenced was merged into Python. I am new to Python's bug tracker so couldn't really follow. I am sure I will find some time to review your patch today. Thanks for submitting it.

For what its worth I am using both python 3.6 and 3.7 with futurist 1.9 for further reference the entire repository is here: https://github.com/Dantali0n/radloggerpy

Revision history for this message
Herve Beraud (herveberaud) wrote :

Yeah the python patch was merged long time ago:
- https://hg.python.org/cpython/rev/0bcf23a52d55
- https://github.com/python/cpython/commit/2b754f49a57c4f625b73e703456bd5ff04496416

Thanks for your reply concerning the asked versions, I asked that just to be sure about the context here.

Changed in futurist:
assignee: nobody → Herve Beraud (herveberaud)
Revision history for this message
Dantali0n (dantalion) wrote :

Unfortunately, the submitted patch does not resolve the issue. When tracing using PDB I can see that execution blocks indefinitely on calling waiter.event.wait(timeout) line 209 of waiters.py

Revision history for this message
Herve Beraud (herveberaud) wrote :

Thanks for your feedback.
Then I will do more tests.

Revision history for this message
Herve Beraud (herveberaud) wrote :

So, possibly the culprit is eventlet there.

Your execution seems stuck by eventlet [1].

In this case, if I understand it correctly, it's more related to a coroutine who doesn't call the send method. I'm not sure this is is related to a lock.

Can you try to move your break point there [1] and to trace the execution step by step if it's possible.

Anyway, I think my initial changes can help to improve things.

[1] https://github.com/eventlet/eventlet/blob/master/eventlet/event.py#L95

Revision history for this message
Dantali0n (dantalion) wrote :

Yes, I think the patch you made should be merged regardless off if it fixes the issue.

I traced waiter.event.wait(timeout) all the way down to were it blocks which is line 296 of threading.py with waiter.acquire()

https://github.com/python/cpython/blob/master/Lib/threading.py#L297

Revision history for this message
Dantali0n (dantalion) wrote :

This behavior is not limited to Futurist but to Python itself, I already expected this but now have a demo to proof it: https://github.com/Dantali0n/bug1848457.

My suspicion is that this is something that simply can not be avoided with CPython as it is the GIL that is causing this.

Revision history for this message
Herve Beraud (herveberaud) wrote :

Interesting, I'll take a look to your POC.

Then, if it's not a futurist issue I move the status of this bug to "opinion" to keep open the discussion.

Changed in futurist:
status: New → Opinion
Revision history for this message
Herve Beraud (herveberaud) wrote :

An interesting test that you can do with your POC is to try your POC against CPython without the GIL (aka gilectomy):

- https://github.com/larryhastings/gilectomy

You can compile this one and use it as your interpreter for your POC and observe if your issue still occur.

Revision history for this message
Dantali0n (dantalion) wrote :

It still happens but I understand why now, because it is the last statement it triggers __exit__ which tries to shutdown all threads in the threadpool which of course will block indefinitely since either the read or write lock is acquired while the other still waits in trying to acquire it.

So the reason the POC with native Python blocks might be completely different as to why Futurist blocks. Since in the Futurist example `results = waiters.wait_for_any(futures)` clearly is not the last statement so this does not apply.

Revision history for this message
Dantali0n (dantalion) wrote :

I created a spin_for_any loop using native Python concurrency that works as expected both with and without GIL: https://github.com/Dantali0n/bug1848457/tree/69648d1da5292d8005c62a00df9a0dab77f4e6e0

This shows that calling condition.wait() inside a thread does not block any future.done() calls.

Changed in futurist:
importance: Undecided → Medium
Revision history for this message
OpenStack Infra (hudson-openstack) wrote : Related fix merged to futurist (master)

Reviewed: https://review.opendev.org/689691
Committed: https://git.openstack.org/cgit/openstack/futurist/commit/?id=d3cb90562999a7a820f9501a77ee808dc8088c70
Submitter: Zuul
Branch: master

commit d3cb90562999a7a820f9501a77ee808dc8088c70
Author: Hervé Beraud <email address hidden>
Date: Mon Oct 21 10:28:53 2019 +0200

    Fix Calling waiters.wait_for_any() blocks if future has called Condition.wait()

    condition.wait() can block forever even if Futures have completed

    A similar issue was already fixed on cpython few years ago:
    - https://bugs.python.org/issue20319
    - https://hg.python.org/cpython/rev/0bcf23a52d55
    - https://github.com/python/cpython/commit/2b754f49a57c4f625b73e703456bd5ff04496416

    Change-Id: I1fbad12a0d555e199fdf4073db9408a296e7fd8a
    Related-Bug: #1848457

Revision history for this message
Dantali0n (dantalion) wrote :

Hello everyone, it has been a while but I have finally figured it out.

The situation occurs due to GreenThreadPoolExecutor. These "thread" offered by eventlet are so called user threads. The kernel is unaware of their existence and hence they are all executed on the same physical / logical core. As a result any blocking / waiting dependency created on these threads that can not be resolved by scheduling between them (deadlock) will cause the main thread to halt as well as the threads spawned.

I know believe that this side-effect is unavoidable when using user threads. I have updated the demo's in the repository so everyone can confirm this for themselves: https://github.com/Dantali0n/bug1848457

To post a comment you must log in.
This report contains Public information  
Everyone can see this information.

Other bug subscribers

Remote bug watches

Bug watches keep track of this bug in other bug trackers.