From 3dbc6645571b07cbd30f9f5ec4382cb4bd76dcec Mon Sep 17 00:00:00 2001 From: Erno Kuvaja Date: Thu, 6 Oct 2022 10:24:56 +0100 Subject: [PATCH] SEC WIP: async hashing on location add Change-Id: I7f56ff06bfe8777f94ef0ce9dedf37ddb6fd84b7 --- glance/api/v2/images.py | 33 ++++- glance/async_/flows/add_location.py | 217 ++++++++++++++++++++++++++++ 2 files changed, 246 insertions(+), 4 deletions(-) create mode 100644 glance/async_/flows/add_location.py diff --git a/glance/api/v2/images.py b/glance/api/v2/images.py index 95cfe7b9c..6b0cc2265 100644 --- a/glance/api/v2/images.py +++ b/glance/api/v2/images.py @@ -674,7 +674,7 @@ class ImagesController(object): json_schema_version = change.get('json_schema_version', 10) if path_root == 'locations': api_pol.update_locations() - self._do_add_locations(image, path[1], value) + self._do_add_locations(image, path[1], value, req) else: api_pol.update_property(path_root, value) if ((hasattr(image, path_root) or @@ -1043,7 +1043,7 @@ class ImagesController(object): raise webob.exc.HTTPBadRequest( explanation=encodeutils.exception_to_unicode(ve)) - def _do_add_locations(self, image, path_pos, value): + def _do_add_locations(self, image, path_pos, value, req): if CONF.show_multiple_locations == False: msg = _("It's not allowed to add locations if locations are " "invisible.") @@ -1067,11 +1067,36 @@ class ImagesController(object): msg = _("Invalid position for adding a location.") raise webob.exc.HTTPBadRequest(explanation=msg) try: - image.locations.insert(pos, updated_location) + ctxt = req.context + # NOTE(jokke): We might need the admin context here to get around + # the onion, pending testing, just here as reminder for myself + admin_context = ctxt.elevated() + executor_factory = self.gateway.get_task_executor_factory( + ctxt, admin_context=admin_context) + + # NOTE(jokke): As the taskflow will activate the image for us + # we just add the possible metadat from val_data here. if image.status == 'queued': for k, v in val_data.items(): setattr(image, k, v) - image.status = 'active' + + location_task = task_factory.new_task(task_type='add_location', + image_id=image.image_id, + location=updated_location, + pos=pos, + request_id=ctxt.request_id) + + task_repo.add(location_task) + task_executor = executor_factory.new_task_executor(ctxt) + pool = common.get_thread_pool("tasks_pool") + pool.spawn(import_task.run, task_executor) + except exception.Forbidden as e: + LOG.debug("User not permitted to create image import task.") + raise webob.exc.HTTPForbidden(explanation=e.msg) + except exception.Conflict as e: + raise webob.exc.HTTPConflict(explanation=e.msg) + except exception.InvalidImageStatusTransition as e: + raise webob.exc.HTTPConflict(explanation=e.msg) except (exception.BadStoreUri, exception.DuplicateLocation) as e: raise webob.exc.HTTPBadRequest(explanation=e.msg) except ValueError as e: # update image status failed. diff --git a/glance/async_/flows/add_location.py b/glance/async_/flows/add_location.py new file mode 100644 index 000000000..77e5c78fc --- /dev/null +++ b/glance/async_/flows/add_location.py @@ -0,0 +1,217 @@ +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import time + +import glance_store as store_api +from glance_store.common import utils as sutils +from oslo_config import cfg +from oslo_log import log as logging +from taskflow.patterns import linear_flow as lf +from taskflow import retry +from taskflow import task + +from glance.common import exception +from glance.i18n import _ + + +LOG = logging.getLogger(__name__) + + +CONF = cfg.CONF + + +location_process_opts = [ + cfg.StringOpt('disable_location_hashing', + default="No", + choices=('No', + 'Yes, performance over security, ' + 'I know what I am doing'), + help=_(""" +Disable add and verify multihash while adding locations. + +Related options: + * show_image_direct_url + * show_multiple_locations + """)) +] + +CONF.register_opts(location_process_opts, group='main') + + +class _AddLocation(task.Task): + + def __init__(self, task_id, task_type, task_repo, image_id, image_repo, + location, pos, set_active): + self.task_id = task_id + self.task_type = task_type + self.task_repo = task_repo + self.image_id = image_id + self.image_repo = image_repo + self.location = location + self.pos = pos + self.set_active = set_active + super(_AddLocation, self).__init__( + name='%s-AddLocation-%s' % (task_type, task_id)) + + def execute(self): + if self.image.status == "deleted": + raise exception.ImporTaskError("Image has been deleted.") + elif self.image.status == "queued" and self.set_active: + try: + self.image.locations.insert(self.pos, self.location) + self.image.status = 'active' + except (exception.BadStoreUri, exception.DuplicateLocation) as e: + LOG.warning("%s failed due to error %s", (self.name, e.msg)) + elif self.image.status in ["queued", "active"]: + try: + self.image.locations.insert(self.pos, self.location) + except (exception.BadStoreUri, exception.DuplicateLocation) as e: + LOG.warning("%s failed due to error %s", (self.name, e.msg)) + self.image_repo.save(self.image) + + +class _ImageHashAndSize(task.Task): + + def __init__(self, task_id, task_type, task_repo, image_id, image_repo, + location): + self.task_id = task_id + self.task_type = task_type + self.task_repo = task_repo + self.image_id = image_id + self.image_repo = image_repo + self.location = location + image = image_repo.get(image_id) + self.size = getattr(image, "size", 0) + super(_ImageHashAndSize, self).__init__( + name='%s-ImageHashAndSize-%s' % (task_type, task_id)) + + def execute(self): + if self.image.status == "deleted": + raise exception.ImporTaskError("Image has been deleted.") + + image = self.image_repo.get(self.image_id) + if getattr(image, "os_hash_algo", False): + self.hash_algo = image.os_hash_algo + self._check_existing() + else: + self.hash_algo = CONF['hashing_algorithm'] + self._create_new() + + def _check_existing(self): + os_hash_value = sutils.get_hasher(self.hash_algo) + if getattr(self.location['metadata'], "store", False): + i_data, i_size = store_api.get(self.location['url'], + self.location['metadata']['store']) + size_read = 0 + while True: + try: + buffer = i_data.next() + size_read += len(buffer) + os_hash_value.update(buffer) + except StopIteration(): + break + if size_read > CONF['image_size_cap']: + raise exception.ImageTooLarge(msg="Read surpassed cap " + "while hashing") + # NOTE(jokke): Here we give the intrepreter chance to schedule + # something else in between to avoid blocking too much + time.sleep(0) + + image = self.image_repo.get(self.image_id) + if os_hash_value.hexdigest() != image.os_hash_value: + # NOTE(jokke): If the hashes do not match, something is clearly + # wrong. We do not want some automation or malicious user to + # DOS the gapi process by resubmitting the location, just + # because we silently failed to add it. + image.status = "deactivated" + self.image_repo.save(image) + LOG.warning("%s: 'os_hash_value' does not match on newly added" + " location %s. Image deactivated.", + (self.name, self.location['url'])) + if self.size == 0: + image.size = size_read + self.image_repo.save(image) + + def _create_new(self): + os_hash_value = sutils.get_hasher(self.hash_algo) + if getattr(self.location['metadata'], "store", False): + i_data, i_size = store_api.get(self.location['url'], + self.location['metadata']['store']) + while True: + try: + buffer = i_data.next() + self.size += len(buffer) + os_hash_value.update(buffer) + except StopIteration(): + break + if self.size > CONF['image_size_cap']: + raise exception.ImageTooLarge(msg="Read surpassed cap " + "while hashing") + # NOTE(jokke): Here we give the intrepreter chance to schedule + # something else in between to avoid blocking too much + time.sleep(0) + + image = self.image_repo.get(self.image_id) + image.os_hash_algo = self.hash_algo + image.os_hash_value = self.os_hash_value.hexdigest() + image.size = self.size + self.image_repo.save(self.image) + + +def get_flow(**kwargs): + """Return task flow + + :param task_id: Task ID + :param task_type: Type of the task + :param task_repo: Taks repo + :param image_repo: Image repo + :param image_id: ID of the image + :param location: Location to be added + :param pos: Position in the location list where the location is added + :param set_active: Set image active on location add + """ + + # NOTE(jokke): We might want to do quota checks here too, but it's + # not relevant for fixing bug #1990157 and thus not part of this patch. + task_id = kwargs.get("task_id") + task_type = kwargs.get("task_type") + task_repo = kwargs.get("task_repo") + image_repo = kwargs.get("image_repo") + image_id = kwargs.get("image_id") + location = kwargs.get("location") + pos = kwargs.get("pos") + set_active = kwargs.get("set_active", True) + + flow = lf.Flow(task_type, retry=retry.AlwaysRevert()) + + image = image_repo.get(image_id) + if getattr(image, 'os_hash_value', False): + if CONF['disable_location_hashing'] != "No": + task = _ImageHashAndSize(task_id, task_type, task_repo, + image_id, image_repo, location) + flow.add(task) + task = _AddLocation(task_id, task_type, task_repo, image_id, + image_repo, location, pos, set_active) + flow.add(task) + else: + task = _AddLocation(task_id, task_type, task_repo, image_id, + image_repo, location, pos, set_active) + flow.add(task) + if CONF['disable_location_hashing'] != "No": + task = _ImageHashAndSize(task_id, task_type, task_repo, + image_id, image_repo, location) + flow.add(task) + + return flow -- 2.37.3