Context: Octavia uses taskflow with jobboard (redis backend).
When inserting some random value in redis, it breaks Octavia with an infinite loop of exceptions:
in redis-cli
> HSET octavia_jobboard:listings 12312 4314
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow [-] Error while running jobs.: taskflow.exceptions.JobFailure: Failed to deserialize object from msgpack blob (of length 4)
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ValueError: Expected msgpack decodable data: unpack(b) received extra data.
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow Traceback (most recent call last):
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow File "/opt/stack/taskflow/taskflow/utils/misc.py", line 319, in decode_msgpack
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow data = msgpackutils.loads(raw_data)
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow File "/home/stack/.local/lib/python3.6/site-packages/oslo_serialization/msgpackutils.py", line 484, in loads
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow return msgpack.unpackb(s, ext_hook=ext_hook, raw=False)
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow File "msgpack/_unpacker.pyx", line 202, in msgpack._cmsgpack.unpackb
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow msgpack.exceptions.ExtraData: unpack(b) received extra data.
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow During handling of the above exception, another exception occurred:
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow Traceback (most recent call last):
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow File "/opt/stack/taskflow/taskflow/jobs/backends/impl_redis.py", line 730, in _loads
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow return misc.decode_msgpack(blob, root_types=root_types)
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow File "/opt/stack/taskflow/taskflow/utils/misc.py", line 323, in decode_msgpack
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow raise ValueError("Expected msgpack decodable data: %s" % e)
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow ValueError: Expected msgpack decodable data: unpack(b) received extra data.
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow The above exception was the direct cause of the following exception:
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow Traceback (most recent call last):
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow File "/opt/stack/octavia/octavia/common/base_taskflow.py", line 186, in run_conductor
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow conductor.run()
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow File "/opt/stack/taskflow/taskflow/conductors/backends/impl_executor.py", line 327, in run
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow max_dispatches=max_dispatches)
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow File "/opt/stack/taskflow/taskflow/conductors/backends/impl_executor.py", line 288, in _run_until_dead
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow for job in job_it:
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow File "/opt/stack/taskflow/taskflow/jobs/base.py", line 336, in __next__
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow ensure_fresh=self.ensure_fresh))
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow File "/opt/stack/taskflow/taskflow/jobs/backends/impl_redis.py", line 848, in <lambda>
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow board_fetch_func=lambda ensure_fresh: self._fetch_jobs())
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow File "/opt/stack/taskflow/taskflow/jobs/backends/impl_redis.py", line 818, in _fetch_jobs
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow job_data = self._loads(raw_posting)
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow File "/opt/stack/taskflow/taskflow/jobs/backends/impl_redis.py", line 737, in _loads
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow " msgpack blob (of length %s)" % len(blob))
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow File "/opt/stack/taskflow/taskflow/exceptions.py", line 52, in raise_with_cause
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow excutils.raise_with_cause(exc_cls, message, *args, **kwargs)
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow File "/home/stack/.local/lib/python3.6/site-packages/oslo_utils/excutils.py", line 142, in raise_with_cause
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow raise exc_cls(message, *args, **kwargs) from kwargs.get('cause')
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow taskflow.exceptions.JobFailure: Failed to deserialize object from msgpack blob (of length 4)
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow ValueError: Expected msgpack decodable data: unpack(b) received extra data.
Nov 05 09:13:50 devstack1 octavia-worker[1265770]: ERROR octavia.common.base_taskflow
The exception is raised in RedisJobBoard._loads but is not caught by the caller and the data is never removed from redis.
The appropriate fix would be to catch the exception and delete the data from redis if it is invalid.
Then when pushing into redis a valid msgpack buffer but with incomplete data, it triggers some other errors:
Adding this msgpack encoded dict in redis:
{
'created_on': datetime.datetime.now(),
'uuid': 'fesdfewsf',
'name': 'foo',
'sequence': 1234,
'book': 'book1'
}
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: WARNING taskflow.conductors.backends.impl_executor [-] Job dispatching failed: RedisJob: foo (priority=JobPriority.NORMAL, uuid=fesdfewsf, details={}): AttributeError: 'str' object has no attribute 'get'
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor Traceback (most recent call last):
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor File "/opt/stack/taskflow/taskflow/conductors/backends/impl_executor.py", line 241, in _on_job_done
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor consume = fut.result()
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 425, in result
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor return self.__get_result()
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 384, in __get_result
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor raise self._exception
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor File "/usr/local/lib/python3.6/site-packages/futurist/_utils.py", line 49, in run
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor result = self.fn(*self.args, **self.kwargs)
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor File "/opt/stack/taskflow/taskflow/conductors/backends/impl_executor.py", line 151, in _dispatch_job
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor engine = self._engine_from_job(job)
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor File "/opt/stack/taskflow/taskflow/conductors/base.py", line 114, in _engine_from_job
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor flow_detail = self._flow_detail_from_job(job)
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor File "/opt/stack/taskflow/taskflow/conductors/base.py", line 91, in _flow_detail_from_job
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor book = job.book
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor File "/opt/stack/taskflow/taskflow/jobs/base.py", line 211, in book
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor self._book = self._load_book()
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor File "/opt/stack/taskflow/taskflow/jobs/base.py", line 252, in _load_book
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor book_uuid = self.book_uuid
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor File "/opt/stack/taskflow/taskflow/jobs/base.py", line 223, in book_uuid
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor return self._book_data.get('uuid')
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor AttributeError: 'str' object has no attribute 'get'
Nov 05 09:18:33 devstack1 octavia-worker[1265770]: ERROR taskflow.conductors.backends.impl_executor
The data is not consumed (it is abandoned) and it is not removed from the redis DB, causing a loop of exception as well.
Those requests should be discarded from the DB when an error occurs.
Fix proposed to branch: master /review. opendev. org/c/openstack /taskflow/ +/816798
Review: https:/