Infinite loop of exception when getting invalid data from redis

Bug #1949950 reported by Gregory Thiemonge
6
This bug affects 1 person
Affects Status Importance Assigned to Milestone
taskflow
Fix Released
Undecided
Unassigned

Bug Description

Context: Octavia uses taskflow with jobboard (redis backend).

When inserting some random value in redis, it breaks Octavia with an infinite loop of exceptions:

in redis-cli
> HSET octavia_jobboard:listings 12312 4314

Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow [-] Error while running jobs.: taskflow.exceptions.JobFailure: Failed to deserialize object from msgpack blob (of length 4)
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ValueError: Expected msgpack decodable data: unpack(b) received extra data.
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow Traceback (most recent call last):
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow File "/opt/stack/taskflow/taskflow/utils/misc.py", line 319, in decode_msgpack
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow data = msgpackutils.loads(raw_data)
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow File "/home/stack/.local/lib/python3.6/site-packages/oslo_serialization/msgpackutils.py", line 484, in loads
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow return msgpack.unpackb(s, ext_hook=ext_hook, raw=False)
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow File "msgpack/_unpacker.pyx", line 202, in msgpack._cmsgpack.unpackb
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow msgpack.exceptions.ExtraData: unpack(b) received extra data.
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow During handling of the above exception, another exception occurred:
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow Traceback (most recent call last):
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow File "/opt/stack/taskflow/taskflow/jobs/backends/impl_redis.py", line 730, in _loads
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow return misc.decode_msgpack(blob, root_types=root_types)
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow File "/opt/stack/taskflow/taskflow/utils/misc.py", line 323, in decode_msgpack
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow raise ValueError("Expected msgpack decodable data: %s" % e)
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow ValueError: Expected msgpack decodable data: unpack(b) received extra data.
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow The above exception was the direct cause of the following exception:
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow Traceback (most recent call last):
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow File "/opt/stack/octavia/octavia/common/base_taskflow.py", line 186, in run_conductor
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow conductor.run()
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow File "/opt/stack/taskflow/taskflow/conductors/backends/impl_executor.py", line 327, in run
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow max_dispatches=max_dispatches)
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow File "/opt/stack/taskflow/taskflow/conductors/backends/impl_executor.py", line 288, in _run_until_dead
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow for job in job_it:
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow File "/opt/stack/taskflow/taskflow/jobs/base.py", line 336, in __next__
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow ensure_fresh=self.ensure_fresh))
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow File "/opt/stack/taskflow/taskflow/jobs/backends/impl_redis.py", line 848, in <lambda>
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow board_fetch_func=lambda ensure_fresh: self._fetch_jobs())
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow File "/opt/stack/taskflow/taskflow/jobs/backends/impl_redis.py", line 818, in _fetch_jobs
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow job_data = self._loads(raw_posting)
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow File "/opt/stack/taskflow/taskflow/jobs/backends/impl_redis.py", line 737, in _loads
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow " msgpack blob (of length %s)" % len(blob))
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow File "/opt/stack/taskflow/taskflow/exceptions.py", line 52, in raise_with_cause
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow excutils.raise_with_cause(exc_cls, message, *args, **kwargs)
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow File "/home/stack/.local/lib/python3.6/site-packages/oslo_utils/excutils.py", line 142, in raise_with_cause
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow raise exc_cls(message, *args, **kwargs) from kwargs.get('cause')
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow taskflow.exceptions.JobFailure: Failed to deserialize object from msgpack blob (of length 4)
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow ValueError: Expected msgpack decodable data: unpack(b) received extra data.
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow

The exception is raised in RedisJobBoard._loads but is not caught by the caller and the data is never removed from redis.
The appropriate fix would be to catch the exception and delete the data from redis if it is invalid.

Then when pushing into redis a valid msgpack buffer but with incomplete data, it triggers some other errors:

Adding this msgpack encoded dict in redis:
{
   'created_on': datetime.datetime.now(),
   'uuid': 'fesdfewsf',
   'name': 'foo',
   'sequence': 1234,
   'book': 'book1'
}

Nov 05 09:18:33 devstack1 octavia-worker[1265770]: WARNING taskflow.conductors.backends.impl_executor [-] Job dispatching failed: RedisJob: foo (priority=JobPriority.NORMAL, uuid=fesdfewsf, details={}): AttributeError: 'str' object has no attribute 'get'
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor Traceback (most recent call last):
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor File "/opt/stack/taskflow/taskflow/conductors/backends/impl_executor.py", line 241, in _on_job_done
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor consume = fut.result()
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 425, in result
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor return self.__get_result()
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 384, in __get_result
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor raise self._exception
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor File "/usr/local/lib/python3.6/site-packages/futurist/_utils.py", line 49, in run
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor result = self.fn(*self.args, **self.kwargs)
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor File "/opt/stack/taskflow/taskflow/conductors/backends/impl_executor.py", line 151, in _dispatch_job
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor engine = self._engine_from_job(job)
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor File "/opt/stack/taskflow/taskflow/conductors/base.py", line 114, in _engine_from_job
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor flow_detail = self._flow_detail_from_job(job)
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor File "/opt/stack/taskflow/taskflow/conductors/base.py", line 91, in _flow_detail_from_job
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor book = job.book
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor File "/opt/stack/taskflow/taskflow/jobs/base.py", line 211, in book
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor self._book = self._load_book()
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor File "/opt/stack/taskflow/taskflow/jobs/base.py", line 252, in _load_book
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor book_uuid = self.book_uuid
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor File "/opt/stack/taskflow/taskflow/jobs/base.py", line 223, in book_uuid
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor return self._book_data.get('uuid')
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor AttributeError: 'str' object has no attribute 'get'
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor

The data is not consumed (it is abandoned) and it is not removed from the redis DB, causing a loop of exception as well.
Those requests should be discarded from the DB when an error occurs.

Revision history for this message
OpenStack Infra (hudson-openstack) wrote : Fix proposed to taskflow (master)

Fix proposed to branch: master
Review: https://review.opendev.org/c/openstack/taskflow/+/816798

Changed in taskflow:
status: New → In Progress
Revision history for this message
OpenStack Infra (hudson-openstack) wrote :

Fix proposed to branch: master
Review: https://review.opendev.org/c/openstack/taskflow/+/816799

Revision history for this message
OpenStack Infra (hudson-openstack) wrote : Fix merged to taskflow (master)

Reviewed: https://review.opendev.org/c/openstack/taskflow/+/816798
Committed: https://opendev.org/openstack/taskflow/commit/ba2cbd72f9deee5fedf15864dac41471f4402c82
Submitter: "Zuul (22348)"
Branch: master

commit ba2cbd72f9deee5fedf15864dac41471f4402c82
Author: Gregory Thiemonge <email address hidden>
Date: Fri Nov 5 09:23:35 2021 +0100

    Handle invalid redis entries in RedisJobBoard

    When catching that a redis buffer is not formatted correctly,
    _fetch_jobs now deletes the data from the hashmap, so the same error
    will not occur during the next call to the function.

    The commit also adds JobFailure in the list of caught exceptions (might
    be raised by _loads) in _fetch_jobs

    Closes-Bug: #1949950

    Change-Id: I9078086cd4c3dbfa954422dfa3d5b19fefd7264f

Changed in taskflow:
status: In Progress → Fix Released
Revision history for this message
OpenStack Infra (hudson-openstack) wrote : Fix included in openstack/taskflow 4.6.4

This issue was fixed in the openstack/taskflow 4.6.4 release.

Revision history for this message
OpenStack Infra (hudson-openstack) wrote : Fix merged to taskflow (master)

Reviewed: https://review.opendev.org/c/openstack/taskflow/+/816799
Committed: https://opendev.org/openstack/taskflow/commit/a45f8249c966a7241556a465f8c07c7fbb210277
Submitter: "Zuul (22348)"
Branch: master

commit a45f8249c966a7241556a465f8c07c7fbb210277
Author: Gregory Thiemonge <email address hidden>
Date: Fri Nov 5 09:29:48 2021 +0100

    Delete the job from backend if it cannot be consumed

    In case of invalid/missing data in the job, the job entry should be
    purged from the backend to avoid re-scheduling it later.

    Closes-Bug: #1949950

    Change-Id: I09ab84883cd4dfbab18b56c61e585fd8ac1b6acf

Revision history for this message
OpenStack Infra (hudson-openstack) wrote : Fix included in openstack/taskflow 5.5.0

This issue was fixed in the openstack/taskflow 5.5.0 release.

To post a comment you must log in.
This report contains Public information  
Everyone can see this information.

Other bug subscribers

Remote bug watches

Bug watches keep track of this bug in other bug trackers.